练习 - 创建 Kafka 制作者
现在已部署了 Kafka 和 Spark 群集,接下来可将 Kafka 制作者添加到 Kafka 头节点。 该制作者是股票价格模拟器,它生成人为股票价格。
下载示例
- 在 Internet 浏览器中转到 https://github.com/Azure/hdinsight-mslearn,并在本地下载或克隆示例(如果在上一模块中未执行此操作)。
- 在本地打开 Spark Structured Streaming\python-producer-simulator-template.py 文件。
检索 Kafka 中转站 URL
接下来,在头节点上使用 ssh 并将 URL 添加到 Python 文件,你需要通过这种方式来检索 Kafka 中转站 URL。
若要连接到 Apache Kafka 群集的主头节点,需要通过 ssh 连接到该节点。 建议使用 Azure 门户中的 Azure Cloud Shell 进行连接。 在 Azure 门户中,单击顶部工具栏中的“Azure Cloud Shell”按钮,然后选择“Bash”。 还可以使用启用了 ssh 的命令提示符,如 Git Bash。
如果之前尚未使用过 Azure Cloud Shell,则会显示指出你没有装载存储的通知。 从“订阅”框中选择 Azure 订阅,然后单击“创建存储”。
在云提示下,粘贴以下命令。 将
sshuser
替换为 SSH 用户名。 将kafka-mslearn-stock
替换为 Apache Kafka 群集的名称,请注意,必须在群集名称后附上 -ssh。ssh sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net
首次连接到群集时,SSH 客户端可能会显示一个警告,提示无法验证主机。 当系统提示时,请键入“yes”,然后按 Enter,将主机添加到 SSH 客户端的受信任服务器列表 。
出现提示时,请输入 SSH 用户名密码。
连接后,显示的信息类似于以下文本:
Welcome to Ubuntu 16.04.6 LTS (GNU/Linux 4.15.0-1063-azure x86_64) * Documentation: https://help.ubuntu.com * Management: https://landscape.canonical.com * Support: https://ubuntu.com/advantage * Overheard at KubeCon: "microk8s.status just blew my mind". https://microk8s.io/docs/commands#microk8s.status 0 packages can be updated. 0 updates are security updates. Welcome to Kafka on HDInsight. The programs included with the Ubuntu system are free software; the exact distribution terms for each program are described in the individual files in /usr/share/doc/*/copyright. Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by applicable law. To run a command as administrator (user "root"), use "sudo <command>". See "man sudo_root" for details.
安装 jq,一个命令行 JSON 处理程序。 此实用程序用于分析 JSON 文档和主机信息。 在打开的 SSH 连接中,输入以下命令以安装
jq
:sudo apt -y install jq
设置密码变量。 将
PASSWORD
替换为群集登录密码,然后输入以下命令:export password='PASSWORD'
提取具有正确大小写格式的群集名称。 群集名称的实际大小写格式可能出乎预期,具体取决于群集的创建方式。 此命令将获取实际的大小写,然后将其存储在变量中。 输入以下命令:
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
此命令无响应。
若要使用 Zookeeper 主机信息来设置环境变量,请使用以下命令。 此命令检索所有 Zookeeper 主机,然后仅返回前两个条目。 这是由于某个主机无法访问时,需要一些冗余。
export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
注意
此命令需要 Ambari 访问权限。 如果群集位于 NSG 后面,请在可访问 Ambari 的计算机上运行此命令。
此命令也没有响应。
若要验证是否已正确设置了环境变量,请使用以下命令:
echo $KAFKAZKHOSTS
此命令返回类似于以下文本的信息:
zk0-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,zk2-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181
若要使用 Apache Kafka 代理主机信息来设置环境变量,请使用以下命令:
export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
注意
此命令需要 Ambari 访问权限。 如果群集位于 NSG 后面,请在可访问 Ambari 的计算机上运行此命令。
此命令没有输出。
若要验证是否已正确设置了环境变量,请使用以下命令:
echo $KAFKABROKERS
此命令返回类似于以下文本的信息:
wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092
将上一步返回的某个 Kafka 中转站值复制到第 19 行的 python-producer-simulator-template.py 文件中,并将该值用单引号引起来,例如:
kafkaBrokers = ['wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092']
保存 python-producer-simulator-template-simulator-template.py 文件。
返回到 ssh 连接窗口,使用以下命令创建主题。
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic stockVals --zookeeper $KAFKAZKHOSTS
此命令使用存储在 $KAFKAZKHOSTS 中的主机信息连接到 Zookeeper。 然后创建一个名为 stockVals 的 Apache Kafka 主题,以匹配 python-producer-simulator-template.py 中的主题名称。
将 Python 文件复制到头节点,并运行文件以流式处理数据
在新的 Git 窗口中,导航到 python-producer-simulator-template.py 文件所在的位置,并使用以下命令将该文件从本地计算机复制到主头节点。 将
kafka-mslearn-stock
替换为 Apache Kafka 群集的名称,请注意,必须在群集名称后附上 -ssh。scp python-producer-simulator-template.py sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net:
当系统询问是否要继续连接时,请键入“是”。 然后在出现提示时,输入群集的密码。 文件传输后,将显示以下输出。
python-producer-simulator-template.py 100% 1896 71.9KB/s 00:00
现在切换回检索代理信息的 Azure 命令提示符,并运行以下命令安装 Kafka:
sudo pip install kafka-python
Kafka 安装成功后,将显示以下输出。
Installing collected packages: kafka-python Successfully installed kafka-python-1.4.7
在同一个窗口中,使用以下命令安装请求:
sudo apt-get install python-requests
当系统询问“此操作后,将使用 4,327 kB 的额外磁盘空间。 是否继续? [Y/n]”输入 y。
请求成功安装后,将显示类似于以下内容的输出。
Setting up python-urllib3 (1.13.1-2ubuntu0.16.04.3) ... Setting up python-requests (2.9.1-3ubuntu0.1) ...
在同一个窗口中,使用以下命令运行 Python 文件
python python-producer-simulator-template.py
应该会看到与下面类似的输出:
No loops argument provided. Default loops are 1000 Running in simulated mode [ { "symbol": "MSFT", "size": 355, "price": 147.205, "time": 1578029521022 }, { "symbol": "BA", "size": 345, "price": 352.607, "time": 1578029521022 }, { "symbol": "JNJ", "size": 58, "price": 142.043, "time": 1578029521022 }, { "symbol": "F", "size": 380, "price": 8.545, "time": 1578029521022 }, { "symbol": "TSLA", "size": 442, "price": 329.342, "time": 1578029521022 }, { "symbol": "BAC", "size": 167, "price": 32.921, "time": 1578029521022 }, { "symbol": "GE", "size": 222, "price": 11.115, "time": 1578029521022 }, { "symbol": "MMM", "size": 312, "price": 174.643, "time": 1578029521022 }, { "symbol": "INTC", "size": 483, "price": 54.978, "time": 1578029521022 }, { "symbol": "WMT", "size": 387, "price": 120.355, "time": 1578029521022 } ] stockVals 2 0 stockVals 1 0 stockVals 3 0 stockVals 2 1 stockVals 7 0 stockVals 7 1 stockVals 1 1 stockVals 4 0 stockVals 4 1 stockVals 1 2
此输出提供了 python-producer-simulated-template.py 文件中列出的股票的模拟股票价格,然后是主题、分区和主题中消息的偏移量。 你可以看到,每次触发制作者时(每秒),都会生成新一批股票价格,并且每个新消息都在特定偏移量处被添加到分区中。