搭配使用 HDInsight 上的 Apache Kafka 與 Azure IoT 中樞
了解如何使用 Apache Kafka Connect Azure IoT 中樞連接器在 HDInsight 上的 Apache Kafka 與 Azure IoT 中樞之間移動資料。 在本文件中,您將了解如何從叢集中的邊緣節點執行 IoT 中樞連接器。
Kafka Connect API 可讓您實作連接器,以持續將數據提取至 Kafka,或將數據從 Kafka 推送到另一個系統。 Apache Kafka Connect Azure IoT 中樞是會將資料從 Azure IoT 中樞提取到 Kafka 中的連接器。 它也可以將資料從 Kafka 推送到 IoT 中樞。
從 IoT 中樞提取時,您會使用來源連接器。 推送至 IoT 中樞時,您會使用接收連接器。 IoT 中樞連接器同時提供來源和接收連接器。
下圖顯示在使用連接器時,Azure IoT 中樞與 HDInsight 上的 Kafka 之間的資料流程。
如需如何連接 API 的詳細資訊,請參閱 https://kafka.apache.org/documentation/#connect。
必要條件
HDInsight 上的 Apache Kafka 叢集。 如需詳細資訊,請參閱 HDInsight 上的 Kafka 快速入門 檔。
Kafka 叢集中的邊緣節點。 如需詳細資訊,請參閱 搭配 HDInsight 使用邊緣節點檔。
SSH 用戶端。 如需詳細資訊,請參閱使用 SSH 連線至 HDInsight (Apache Hadoop)。
Azure IoT 中樞和裝置。 在本文中,請考慮使用將 Raspberry Pi 線上模擬器連線至 Azure IoT Hub。
建置連接器
從 https://github.com/Azure/toketi-kafka-connect-iothub/ 將連接器的來源下載到您的本機環境。
如需命令提示字元,請瀏覽至
toketi-kafka-connect-iothub-master
目錄。 然後使用下列命令建置和封裝專案:sbt assembly
建置需要幾分鐘的時間才能完成。 命令會在專案的
toketi-kafka-connect-iothub-master\target\scala-2.11
目錄中建立名為kafka-connect-iothub-assembly_2.11-0.7.0.jar
的檔案。
安裝連接器
將 .jar 檔案上傳至 HDInsight 上 Kafka 叢集的邊緣節點。 以您叢集的實際名稱取代
CLUSTERNAME
,以編輯下列命令。 SSH 使用者帳戶和邊緣節點名稱的預設值會視需要修改。scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
檔案複製完成後,請使用 SSH 連線至邊緣節點:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
若要將連接器安裝到 Kafka
libs
目錄中,請使用下列命令:sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
針對其餘步驟,讓 SSH 連線保持為使用中。
設定 Apache Kafka
透過邊緣節點的 SSH 連線,使用下列步驟設定在獨立模式中執行連接器的 Kafka:
設定密碼變數。 請將 PASSWORD 取代為叢集登入密碼,然後輸入下列命令:
export password='PASSWORD'
安裝 jq 公用程式。 jq 有助於處理 Ambari 查詢所傳回的 JSON 文件。 輸入下列命令:
sudo apt -y install jq
取得 Kafka 訊息代理程式的位址。 您的叢集中可能會有許多訊息代理程式,但您只需要參考其中一或兩個。 若要取得兩個訊息代理程式主機的位址,請使用下列命令:
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2` echo $KAFKABROKERS
複製這些值以供稍後使用。 傳回的值類似下列文字︰
<brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092
取得 Apache Zookeeper 節點的位址。 叢集中可能會有數個 Zookeeper 節點,但您只需要參考其中一或兩個。 使用下列命令將位址儲存在變數
KAFKAZKHOSTS
中:export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
當您以獨立模式執行連接器時,
/usr/hdp/current/kafka-broker/config/connect-standalone.properties
檔案會用來與 Kafka 訊息代理程式通訊。 若要編輯connect-standalone.properties
檔案,請使用下列命令:sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
進行下列編輯:
目前的值 新值 註解 bootstrap.servers=localhost:9092
將 localhost:9092
值取代為先前步驟中的訊息代理程式主機設定為讓邊緣節點找出 Kafka 訊息代理程式的獨立組態。 key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
此變更可讓您使用隨附於 Kafka 的主控台產生者進行測試。 對於其他產生者和取用者,您可能需要不同的轉換器。 如需使用其他轉換器值的相關資訊,請參閱 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md。 value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
與指定相同。 N/A consumer.max.poll.records=10
新增檔案結尾。 此變更可將接收連接器限定為一次 10 筆記錄以內,以防止連接器逾時。 如需詳細資訊,請參閱https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md。 若要儲存檔案,請使用 Ctrl + X、Y 和 Enter 鍵。
若要建立連接器所使用的主題,請使用下列命令:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
若要確認
iotin
和iotout
主題存在,請使用下列命令:/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
iotin
主題可用來接收來自 IoT 中樞的訊息。iotout
主題可用來將訊息傳送至 IoT 中樞。
取得 IoT 中樞連線資訊
若要擷取連接器所使用 IoT 中樞資訊,請使用下列步驟:
取得 IoT 中樞的事件中樞相容端點和事件中樞相容端點名稱。 若要取得這項資訊,請使用下列其中一個方法:
在 Azure 入口網站中,使用下列步驟:
瀏覽至您的 IoT 中樞,並選取 [端點]。
在 [內建端點] 中,選取 [事件]。
在 [屬性] 中,複製下列欄位的值:
- 事件中樞相容名稱
- 事件中樞相容端點
- 資料分割
重要
入口網站中的端點值可能包含在此範例中不需要的多餘文字。 請擷取符合
sb://<randomnamespace>.servicebus.windows.net/
模式的文字。
在 Azure CLI 中,使用下列命令:
az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
請將
myhubname
取代為您的 IoT 中樞名稱。 回應會類似於下列文字:"EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/", "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e", "Partitions": 2
取得共用存取原則和金鑰。 在此範例中,請使用服務金鑰。 若要取得這項資訊,請使用下列其中一個方法:
在 Azure 入口網站中,使用下列步驟:
- 選取 [共用存取原則],然後選取 [服務]。
- 複製 [主要金鑰] 值。
- 複製 [連接字串 – 主要金鑰] 的值。
在 Azure CLI 中,使用下列命令:
若要取得主要金鑰值,請使用下列命令:
az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
請將
myhubname
取代為您的 IoT 中樞名稱。 回應是此中樞之service
原則的主要金鑰。若要取得
service
原則的連接字串,請使用下列命令:az iot hub connection-string show --name myhubname --policy-name service --query "connectionString"
請將
myhubname
取代為您的 IoT 中樞名稱。 回應是service
原則的連接字串。
設定來源連線
若要設定使用 IoT 中樞時的來源,請透過邊緣節點的 SSH 連線執行下列動作:
在
/usr/hdp/current/kafka-broker/config/
目錄中建立connect-iot-source.properties
檔案的複本。 若要從 toketi-kafka-connect-iothub 專案下載檔案,請使用下列命令:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
若要編輯
connect-iot-source.properties
檔案並新增 IoT 中樞資訊,請使用下列命令:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
在編輯器中找出下列項目,並加以變更:
目前的值 編輯 Kafka.Topic=PLACEHOLDER
把 PLACEHOLDER
替換為iotin
。 從 IoT 中樞接收到的訊息會放在iotin
主題中。IotHub.EventHubCompatibleName=PLACEHOLDER
將 PLACEHOLDER
取代為事件中樞相容名稱。IotHub.EventHubCompatibleEndpoint=PLACEHOLDER
將 PLACEHOLDER
取代為事件中樞相容端點。IotHub.AccessKeyName=PLACEHOLDER
把 PLACEHOLDER
替換為service
。IotHub.AccessKeyValue=PLACEHOLDER
將 PLACEHOLDER
取代為service
原則的主要金鑰。IotHub.Partitions=PLACEHOLDER
將 PLACEHOLDER
取代為前述步驟中的分割區數目。IotHub.StartTime=PLACEHOLDER
將 PLACEHOLDER
取代為 UTC 日期。 此日期是連接器開始檢查訊息的時間。 日期格式為yyyy-mm-ddThh:mm:ssZ
。BatchSize=100
把 100
替換為5
。 此變更會使連接器在 IoT 中樞內有五個新訊息之後,將訊息讀取到 Kafka 中。如需範例設定,請參閱適用於 Azure IoT 中樞 的 Kafka 連線來源連接器。
若要儲存變更,請使用 Ctrl + X、Y 和 Enter 鍵。
如需關於設定連接器來源的詳細資訊,請參閱 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md。
設定接收連線
若要設定使用 IoT 中樞時的接收連線,請透過邊緣節點的 SSH 連線執行下列動作:
在
/usr/hdp/current/kafka-broker/config/
目錄中建立connect-iothub-sink.properties
檔案的複本。 若要從 toketi-kafka-connect-iothub 專案下載檔案,請使用下列命令:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
若要編輯
connect-iothub-sink.properties
檔案並新增 IoT 中樞資訊,請使用下列命令:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
在編輯器中找出下列項目,並加以變更:
目前的值 編輯 topics=PLACEHOLDER
把 PLACEHOLDER
替換為iotout
。 寫入iotout
主題的訊息會轉送至 IoT 中樞。IotHub.ConnectionString=PLACEHOLDER
將 PLACEHOLDER
取代為service
原則的連接字串。如需範例設定,請參閱適用於 Azure IoT 中樞 的 Kafka 連線接收連接器。
若要儲存變更,請使用 Ctrl + X、Y 和 Enter 鍵。
如需關於設定連接器接收端的詳細資訊,請參閱 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md。
啟動來源連接器
若要啟動來源連接器,請透過邊緣節點的 SSH 連線使用下列命令:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
連接器啟動後,請從您的裝置將訊息傳送至 IoT 中樞。 連接器從 IoT 中樞讀取訊息並將其儲存於 Kafka 主題時,會將資訊記錄至主控台:
[2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39) [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
注意
當連接器啟動時,您可能會看到幾則警告。 這些警告不會導致從 IoT 中樞接收訊息的問題。
使用 Ctrl + C 兩次後,過幾分鐘停止連接器。 連接器需要幾分鐘的時間才能停止。
啟動接收連接器
透過邊緣節點的 SSH 連線,使用下列命令在獨立模式中啟動接收連接器:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
連接器執行時,會顯示類似下列文字的資訊:
[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
注意
當連接器啟動時,您可能會看到幾則警告。 您可以放心地忽略這些警告。
傳送訊息
若要透過連接器傳送訊息,請使用下列步驟:
開啟 Kafka 叢集的另一個 SSH 工作階段:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
取得新 ssh 工作階段的 Kafka 訊息代理程式位址。 請將 PASSWORD 取代為叢集登入密碼,然後輸入下列命令:
export password='PASSWORD' export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
若要將訊息傳送至
iotout
主題,請使用下列命令:/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
此命令不會使您返回一般 Bash 提示字元。 它會將鍵盤輸入傳送至
iotout
主題。若要將訊息傳送至您的裝置,請將 JSON 文件貼到
kafka-console-producer
的 SSH 工作階段中。重要
您必須將
"deviceId"
項目的值設定為您的裝置識別碼。 在下列範例中,裝置會命名為myDeviceId
:{"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
此 JSON 文件的結構描述在 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md 有更詳細的說明。
如果您使用仿真的Raspberry Pi裝置,而且它正在執行中,裝置會記錄下列訊息。
Receive message: Turn On
Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.
如需使用接收連接器的詳細資訊,請參閱 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md。
下一步
在本文件中,您已了解如何使用 Apache Kafka Connect API 在 HDInsight 上啟動 IoT Kafka Connector。 使用下列連結來探索使用 Kafka 的其他方式︰