共用方式為


搭配使用 HDInsight 上的 Apache Kafka 與 Azure IoT 中樞

了解如何使用 Apache Kafka Connect Azure IoT 中樞連接器在 HDInsight 上的 Apache Kafka 與 Azure IoT 中樞之間移動資料。 在本文件中,您將了解如何從叢集中的邊緣節點執行 IoT 中樞連接器。

Kafka Connect API 可讓您實作連接器,以持續將數據提取至 Kafka,或將數據從 Kafka 推送到另一個系統。 Apache Kafka Connect Azure IoT 中樞是會將資料從 Azure IoT 中樞提取到 Kafka 中的連接器。 它也可以將資料從 Kafka 推送到 IoT 中樞。

從 IoT 中樞提取時,您會使用來源連接器。 推送至 IoT 中樞時,您會使用接收連接器。 IoT 中樞連接器同時提供來源和接收連接器。

下圖顯示在使用連接器時,Azure IoT 中樞與 HDInsight 上的 Kafka 之間的資料流程。

顯示透過連接器從 IoT 中樞 流向 Kafka 之資料的影像。

如需如何連接 API 的詳細資訊,請參閱 https://kafka.apache.org/documentation/#connect

必要條件

建置連接器

  1. https://github.com/Azure/toketi-kafka-connect-iothub/ 將連接器的來源下載到您的本機環境。

  2. 如需命令提示字元,請瀏覽至 toketi-kafka-connect-iothub-master 目錄。 然後使用下列命令建置和封裝專案:

    sbt assembly
    

    建置需要幾分鐘的時間才能完成。 命令會在專案的 toketi-kafka-connect-iothub-master\target\scala-2.11 目錄中建立名為 kafka-connect-iothub-assembly_2.11-0.7.0.jar 的檔案。

安裝連接器

  1. 將 .jar 檔案上傳至 HDInsight 上 Kafka 叢集的邊緣節點。 以您叢集的實際名稱取代 CLUSTERNAME,以編輯下列命令。 SSH 使用者帳戶和邊緣節點名稱的預設值會視需要修改。

    scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
    
  2. 檔案複製完成後,請使用 SSH 連線至邊緣節點:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  3. 若要將連接器安裝到 Kafkalibs 目錄中,請使用下列命令:

    sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
    

針對其餘步驟,讓 SSH 連線保持為使用中。

設定 Apache Kafka

透過邊緣節點的 SSH 連線,使用下列步驟設定在獨立模式中執行連接器的 Kafka:

  1. 設定密碼變數。 請將 PASSWORD 取代為叢集登入密碼,然後輸入下列命令:

    export password='PASSWORD'
    
  2. 安裝 jq 公用程式。 jq 有助於處理 Ambari 查詢所傳回的 JSON 文件。 輸入下列命令:

    sudo apt -y install jq
    
  3. 取得 Kafka 訊息代理程式的位址。 您的叢集中可能會有許多訊息代理程式,但您只需要參考其中一或兩個。 若要取得兩個訊息代理程式主機的位址,請使用下列命令:

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    echo $KAFKABROKERS
    

    複製這些值以供稍後使用。 傳回的值類似下列文字︰

    <brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092

  4. 取得 Apache Zookeeper 節點的位址。 叢集中可能會有數個 Zookeeper 節點,但您只需要參考其中一或兩個。 使用下列命令將位址儲存在變數 KAFKAZKHOSTS 中:

    export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
    
  5. 當您以獨立模式執行連接器時, /usr/hdp/current/kafka-broker/config/connect-standalone.properties 檔案會用來與 Kafka 訊息代理程式通訊。 若要編輯 connect-standalone.properties 檔案,請使用下列命令:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
    
  6. 進行下列編輯:

    目前的值 新值 註解
    bootstrap.servers=localhost:9092 localhost:9092 值取代為先前步驟中的訊息代理程式主機 設定為讓邊緣節點找出 Kafka 訊息代理程式的獨立組態。
    key.converter=org.apache.kafka.connect.json.JsonConverter key.converter=org.apache.kafka.connect.storage.StringConverter 此變更可讓您使用隨附於 Kafka 的主控台產生者進行測試。 對於其他產生者和取用者,您可能需要不同的轉換器。 如需使用其他轉換器值的相關資訊,請參閱 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md
    value.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.storage.StringConverter 與指定相同。
    N/A consumer.max.poll.records=10 新增檔案結尾。 此變更可將接收連接器限定為一次 10 筆記錄以內,以防止連接器逾時。 如需詳細資訊,請參閱https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md
  7. 若要儲存檔案,請使用 Ctrl + XYEnter 鍵。

  8. 若要建立連接器所使用的主題,請使用下列命令:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS
    
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
    

    若要確認 iotiniotout 主題存在,請使用下列命令:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
    

    iotin 主題可用來接收來自 IoT 中樞的訊息。 iotout 主題可用來將訊息傳送至 IoT 中樞。

取得 IoT 中樞連線資訊

若要擷取連接器所使用 IoT 中樞資訊,請使用下列步驟:

  1. 取得 IoT 中樞的事件中樞相容端點和事件中樞相容端點名稱。 若要取得這項資訊,請使用下列其中一個方法:

    • Azure 入口網站,使用下列步驟:

      1. 瀏覽至您的 IoT 中樞,並選取 [端點]

      2. 在 [內建端點] 中,選取 [事件]

      3. 在 [屬性] 中,複製下列欄位的值:

        • 事件中樞相容名稱
        • 事件中樞相容端點
        • 資料分割

        重要

        入口網站中的端點值可能包含在此範例中不需要的多餘文字。 請擷取符合 sb://<randomnamespace>.servicebus.windows.net/ 模式的文字。

    • Azure CLI,使用下列命令:

      az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
      

      請將 myhubname 取代為您的 IoT 中樞名稱。 回應會類似於下列文字:

      "EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/",
      "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e",
      "Partitions": 2
      
  2. 取得共用存取原則金鑰。 在此範例中,請使用服務金鑰。 若要取得這項資訊,請使用下列其中一個方法:

    • Azure 入口網站,使用下列步驟:

      1. 選取 [共用存取原則],然後選取 [服務]
      2. 複製 [主要金鑰] 值。
      3. 複製 [連接字串 – 主要金鑰] 的值。
    • Azure CLI,使用下列命令:

      1. 若要取得主要金鑰值,請使用下列命令:

        az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
        

        請將 myhubname 取代為您的 IoT 中樞名稱。 回應是此中樞之 service 原則的主要金鑰。

      2. 若要取得 service 原則的連接字串,請使用下列命令:

        az iot hub connection-string show --name myhubname --policy-name service --query "connectionString"
        

        請將 myhubname 取代為您的 IoT 中樞名稱。 回應是 service 原則的連接字串。

設定來源連線

若要設定使用 IoT 中樞時的來源,請透過邊緣節點的 SSH 連線執行下列動作:

  1. /usr/hdp/current/kafka-broker/config/ 目錄中建立 connect-iot-source.properties 檔案的複本。 若要從 toketi-kafka-connect-iothub 專案下載檔案,請使用下列命令:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
    
  2. 若要編輯 connect-iot-source.properties 檔案並新增 IoT 中樞資訊,請使用下列命令:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    
  3. 在編輯器中找出下列項目,並加以變更:

    目前的值 編輯
    Kafka.Topic=PLACEHOLDER PLACEHOLDER 替換為 iotin。 從 IoT 中樞接收到的訊息會放在 iotin 主題中。
    IotHub.EventHubCompatibleName=PLACEHOLDER PLACEHOLDER 取代為事件中樞相容名稱。
    IotHub.EventHubCompatibleEndpoint=PLACEHOLDER PLACEHOLDER 取代為事件中樞相容端點。
    IotHub.AccessKeyName=PLACEHOLDER PLACEHOLDER 替換為 service
    IotHub.AccessKeyValue=PLACEHOLDER PLACEHOLDER 取代為 service 原則的主要金鑰。
    IotHub.Partitions=PLACEHOLDER PLACEHOLDER 取代為前述步驟中的分割區數目。
    IotHub.StartTime=PLACEHOLDER PLACEHOLDER 取代為 UTC 日期。 此日期是連接器開始檢查訊息的時間。 日期格式為 yyyy-mm-ddThh:mm:ssZ
    BatchSize=100 100 替換為 5。 此變更會使連接器在 IoT 中樞內有五個新訊息之後,將訊息讀取到 Kafka 中。

    如需範例設定,請參閱適用於 Azure IoT 中樞 的 Kafka 連線來源連接器

  4. 若要儲存變更,請使用 Ctrl + XYEnter 鍵。

如需關於設定連接器來源的詳細資訊,請參閱 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md

設定接收連線

若要設定使用 IoT 中樞時的接收連線,請透過邊緣節點的 SSH 連線執行下列動作:

  1. /usr/hdp/current/kafka-broker/config/ 目錄中建立 connect-iothub-sink.properties 檔案的複本。 若要從 toketi-kafka-connect-iothub 專案下載檔案,請使用下列命令:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
    
  2. 若要編輯 connect-iothub-sink.properties 檔案並新增 IoT 中樞資訊,請使用下列命令:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
    
  3. 在編輯器中找出下列項目,並加以變更:

    目前的值 編輯
    topics=PLACEHOLDER PLACEHOLDER 替換為 iotout。 寫入 iotout 主題的訊息會轉送至 IoT 中樞。
    IotHub.ConnectionString=PLACEHOLDER PLACEHOLDER 取代為 service 原則的連接字串。

    如需範例設定,請參閱適用於 Azure IoT 中樞 的 Kafka 連線接收連接器

  4. 若要儲存變更,請使用 Ctrl + XYEnter 鍵。

如需關於設定連接器接收端的詳細資訊,請參閱 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md

啟動來源連接器

  1. 若要啟動來源連接器,請透過邊緣節點的 SSH 連線使用下列命令:

    /usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    

    連接器啟動後,請從您的裝置將訊息傳送至 IoT 中樞。 連接器從 IoT 中樞讀取訊息並將其儲存於 Kafka 主題時,會將資訊記錄至主控台:

    [2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39)
    [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
    

    注意

    當連接器啟動時,您可能會看到幾則警告。 這些警告不會導致從 IoT 中樞接收訊息的問題。

  2. 使用 Ctrl + C 兩次後,過幾分鐘停止連接器。 連接器需要幾分鐘的時間才能停止。

啟動接收連接器

透過邊緣節點的 SSH 連線,使用下列命令在獨立模式中啟動接收連接器:

/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties

連接器執行時,會顯示類似下列文字的資訊:

[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

注意

當連接器啟動時,您可能會看到幾則警告。 您可以放心地忽略這些警告。

傳送訊息

若要透過連接器傳送訊息,請使用下列步驟:

  1. 開啟 Kafka 叢集的另一個 SSH 工作階段:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  2. 取得新 ssh 工作階段的 Kafka 訊息代理程式位址。 請將 PASSWORD 取代為叢集登入密碼,然後輸入下列命令:

    export password='PASSWORD'
    
    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    
  3. 若要將訊息傳送至 iotout 主題,請使用下列命令:

    /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
    

    此命令不會使您返回一般 Bash 提示字元。 它會將鍵盤輸入傳送至 iotout 主題。

  4. 若要將訊息傳送至您的裝置,請將 JSON 文件貼到 kafka-console-producer 的 SSH 工作階段中。

    重要

    您必須將 "deviceId" 項目的值設定為您的裝置識別碼。 在下列範例中,裝置會命名為 myDeviceId

    {"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
    

    此 JSON 文件的結構描述在 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md 有更詳細的說明。

如果您使用仿真的Raspberry Pi裝置,而且它正在執行中,裝置會記錄下列訊息。

Receive message: Turn On


Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.

如需使用接收連接器的詳細資訊,請參閱 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md

下一步

在本文件中,您已了解如何使用 Apache Kafka Connect API 在 HDInsight 上啟動 IoT Kafka Connector。 使用下列連結來探索使用 Kafka 的其他方式︰