練習 - 建立 Kafka 生產者
既然已部署 Kafka 和 Spark 叢集,那麼讓我們將 Kafka 生產者新增至 Kafka 前端節點。 此生產者是股票價格模擬器,其會產生人工股票價格。
下載範例
- 在您的網際網路瀏覽器中,前往 https://github.com/Azure/hdinsight-mslearn,並在本機下載或複製範例,如果您尚未在先前的課程模組中這樣做的話。
- 在本機開啟 Spark 結構化串流\python-producer-simulator-template.py 檔案。
取出 Kafka 訊息代理程式 URL
接下來,您需要取得 Kafka 訊息代理程式 URL,方法是在前端節點上使用 ssh,並將這些 URL 新增至 python 檔案。
若要連線到 Apache Kafka 叢集的主要前端節點,您需要透過 ssh 連線到節點。 Azure 入口網站中的 Azure Cloud Shell 是建議的連線方式。 在 Azure 入口網站中,按一下頂端工具列中的 [Azure Cloud Shell] 按鈕,然後選取 [Bash]。 您也可以使用已啟用 ssh 的命令提示字元,例如 Git Bash。
如果您之前未使用過 Azure Cloud Shell,則會顯示一則通知,指出您沒有裝載任何儲存體。 從 [訂用帳戶] 方塊中選取您的 Azure 訂用帳戶,然後按一下 [建立儲存體]。
在雲端提示中,貼上下列命令。 將
sshuser
取代為 SSH 使用者名稱。 將kafka-mslearn-stock
取代為 Apache Kafka 叢集的名稱,並請注意,您必須在叢集名稱之後包含 -ssh。ssh sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net
當您初次連線到叢集時,您的 SSH 用戶端可能會顯示警告,指出無法確認主機的真確性。 在系統提示時,輸入 yes,然後按 Enter 鍵,以將主機新增至 SSH 用戶端信任的伺服器清單。
出現提示時,請輸入 SSH 使用者的密碼。
連線之後,您會看到類似下列文字的資訊:
Welcome to Ubuntu 16.04.6 LTS (GNU/Linux 4.15.0-1063-azure x86_64) * Documentation: https://help.ubuntu.com * Management: https://landscape.canonical.com * Support: https://ubuntu.com/advantage * Overheard at KubeCon: "microk8s.status just blew my mind". https://microk8s.io/docs/commands#microk8s.status 0 packages can be updated. 0 updates are security updates. Welcome to Kafka on HDInsight. The programs included with the Ubuntu system are free software; the exact distribution terms for each program are described in the individual files in /usr/share/doc/*/copyright. Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by applicable law. To run a command as administrator (user "root"), use "sudo <command>". See "man sudo_root" for details.
安裝 jq,這是命令列 JSON 處理器。 此公用程式可用來剖析 JSON 文件,而且在剖析主機資訊時很有用。 從開啟的 SSH 連線,輸入下列命令來安裝
jq
:sudo apt -y install jq
設定密碼變數。 請將
PASSWORD
取代為叢集登入密碼,然後輸入下列命令:export password='PASSWORD'
擷取正確大小寫的叢集名稱。 視叢集的建立方式而定,叢集名稱的實際大小寫可能與您預期的不同。 此命令會取得實際的大小寫,然後將其儲存在變數中。 輸入下列命令:
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
此命令沒有回應。
若要使用 Zookeeper 主機資訊設定環境變數,請使用以下命令。 此命令會擷取所有的 Zookeeper 主機,然後只傳回前兩個項目。 這是因為考量備援之故,以防某一部主機無法連線。
export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
注意
此命令需要 Ambari 存取權。 如果您的叢集位於 NSG 後方,請從可存取 Ambari 的機器執行此命令。
此命令也沒有回應。
若要確認是否已正確設定環境變數,請使用下列命令:
echo $KAFKAZKHOSTS
此命令會傳回類似以下文字的資訊:
zk0-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,zk2-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181
若要使用 Apache Kafka 訊息代理程式主機資訊來設定環境變數,請使用下列命令:
export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
注意
此命令需要 Ambari 存取權。 如果您的叢集位於 NSG 後方,請從可存取 Ambari 的機器執行此命令。
此命令沒有輸出。
若要確認是否已正確設定環境變數,請使用下列命令:
echo $KAFKABROKERS
此命令會傳回類似以下文字的資訊:
wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092
將上一個步驟中所傳回的其中一個 Kafka 值複製到第 19 行上的 python-producer-simulator-template.py 檔案,並在該值前後加上單引號,例如:
kafkaBrokers = ['wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092']
儲存 python-producer-simulator-template-simulator-template.py 檔案。
回到 ssh 連線視窗,請使用下列命令來建立主題。
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic stockVals --zookeeper $KAFKAZKHOSTS
此命令會使用 KAFKAZKHOSTS 中儲存的主機資訊連線到 Zookeeper。 然後,其會建立名為 stockVals 的 Apache Kafka 主題,以符合 python-producer-simulator-template.py 中的主題名稱。
將 python 檔案複製到前端節點,並執行該檔案以串流資料
在新的 git 視窗中,瀏覽至 python-producer-simulator-template.py 檔案的位置,然後使用下列命令,將該檔案從本機電腦複製到主要前端節點。 將
kafka-mslearn-stock
取代為 Apache Kafka 叢集的名稱,並請注意,您必須在叢集名稱之後包含 -ssh。scp python-producer-simulator-template.py sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net:
當系統詢問您是否要繼續連線時,請鍵入 yes。 然後,在提示中輸入叢集的密碼。 在檔案傳輸之後,會顯示下列輸出。
python-producer-simulator-template.py 100% 1896 71.9KB/s 00:00
現在會切回 Azure 命令提示字元,您可在其中取出訊息代理程式資訊,並執行下列命令來安裝 Kafka:
sudo pip install kafka-python
在 Kafka 成功安裝之後,就會顯示下列輸出。
Installing collected packages: kafka-python Successfully installed kafka-python-1.4.7
在相同的視窗中,使用下列命令來安裝要求:
sudo apt-get install python-requests
當系統詢問「在此作業之後,將會使用 4327 kB 的額外磁碟空間。 您要繼續嗎? [Y/n]」時,請鍵入 y。
當要求安裝成功時,就會顯示如下的輸出。
Setting up python-urllib3 (1.13.1-2ubuntu0.16.04.3) ... Setting up python-requests (2.9.1-3ubuntu0.1) ...
在同一視窗中,使用下列命令來執行 python 檔案
python python-producer-simulator-template.py
您應該會看到如下輸出:
No loops argument provided. Default loops are 1000 Running in simulated mode [ { "symbol": "MSFT", "size": 355, "price": 147.205, "time": 1578029521022 }, { "symbol": "BA", "size": 345, "price": 352.607, "time": 1578029521022 }, { "symbol": "JNJ", "size": 58, "price": 142.043, "time": 1578029521022 }, { "symbol": "F", "size": 380, "price": 8.545, "time": 1578029521022 }, { "symbol": "TSLA", "size": 442, "price": 329.342, "time": 1578029521022 }, { "symbol": "BAC", "size": 167, "price": 32.921, "time": 1578029521022 }, { "symbol": "GE", "size": 222, "price": 11.115, "time": 1578029521022 }, { "symbol": "MMM", "size": 312, "price": 174.643, "time": 1578029521022 }, { "symbol": "INTC", "size": 483, "price": 54.978, "time": 1578029521022 }, { "symbol": "WMT", "size": 387, "price": 120.355, "time": 1578029521022 } ] stockVals 2 0 stockVals 1 0 stockVals 3 0 stockVals 2 1 stockVals 7 0 stockVals 7 1 stockVals 1 1 stockVals 4 0 stockVals 4 1 stockVals 1 2
此輸出會提供 python-producer-simulated-template.py 檔中所列股票的模擬股票價格,後面接著主題、分割區,以及主題中訊息的位移。 您可以看到每次觸發生產者 (每秒),就會產生新的一批股票價格,並將每則新訊息新增至特定位移的分割區。