Dela via


Använda Apache Kafka i HDInsight med Azure IoT Hub

Lär dig hur du använder Apache Kafka Connect Azure IoT Hub-anslutningsappen för att flytta data mellan Apache Kafka i HDInsight och Azure IoT Hub. I det här dokumentet får du lära dig hur du kör IoT Hub-anslutningsappen från en kantnod i klustret.

Med Kafka Connect API kan du implementera anslutningsappar som kontinuerligt hämtar data till Kafka eller push-överför data från Kafka till ett annat system. Apache Kafka Connect Azure IoT Hub är en anslutningsapp som hämtar data från Azure IoT Hub till Kafka. Den kan också skicka data från Kafka till IoT Hub.

När du hämtar från IoT Hub använder du en källanslutning . När du push-överför till IoT Hub använder du en mottagaranslutning . IoT Hub-anslutningsappen tillhandahåller både käll- och mottagaranslutningarna.

Följande diagram visar dataflödet mellan Azure IoT Hub och Kafka i HDInsight när du använder anslutningsappen.

Bild som visar data som flödar från IoT Hub till Kafka via anslutningsappen.

Mer information om hur du ansluter API finns i https://kafka.apache.org/documentation/#connect.

Förutsättningar

Skapa anslutningsappen

  1. Ladda ned källan för anslutningsappen från https://github.com/Azure/toketi-kafka-connect-iothub/ till din lokala miljö.

  2. Gå till katalogen från toketi-kafka-connect-iothub-master en kommandotolk. Använd sedan följande kommando för att skapa och paketera projektet:

    sbt assembly
    

    Det tar några minuter att slutföra bygget. Kommandot skapar en fil med namnet kafka-connect-iothub-assembly_2.11-0.7.0.jar i toketi-kafka-connect-iothub-master\target\scala-2.11 katalogen för projektet.

Installera anslutningsappen

  1. Ladda upp .jar-filen till kantnoden för din Kafka i HDInsight-klustret. Redigera följande kommando genom att CLUSTERNAME ersätta med det faktiska namnet på klustret. Standardvärdena för SSH-användarkontot och namnet på gränsnoden används för att ändra efter behov.

    scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
    
  2. När filkopian är klar ansluter du till kantnoden med hjälp av SSH:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  3. Om du vill installera anslutningsappen i Kafka-katalogen libs använder du följande kommando:

    sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
    

Håll SSH-anslutningen aktiv för de återstående stegen.

Konfigurera Apache Kafka

Från SSH-anslutningen till gränsnoden använder du följande steg för att konfigurera Kafka att köra anslutningsappen i fristående läge:

  1. Konfigurera lösenordsvariabel. Ersätt LÖSENORD med lösenordet för klusterinloggning och ange sedan kommandot:

    export password='PASSWORD'
    
  2. Installera jq-verktyget. jq gör det enklare att bearbeta JSON-dokument som returneras från Ambari-frågor. Ange följande kommando:

    sudo apt -y install jq
    
  3. Hämta adressen till Kafka-mäklarna. Det kan finnas många koordinatorer i klustret, men du behöver bara referera till en eller två. Använd följande kommando för att hämta adressen till två broker-värdar:

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    echo $KAFKABROKERS
    

    Kopiera värdena för senare användning. Det värde som genereras liknar följande text:

    <brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092

  4. Hämta adressen till Apache Zookeeper-noderna. Det finns flera Zookeeper-noder i klustret, men du behöver bara referera till en eller två. Använd följande kommando för att lagra adresserna i variabeln KAFKAZKHOSTS:

    export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
    
  5. När du kör anslutningsappen i fristående läge /usr/hdp/current/kafka-broker/config/connect-standalone.properties används filen för att kommunicera med Kafka-koordinatorerna. Om du vill redigera connect-standalone.properties filen använder du följande kommando:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
    
  6. Gör följande ändringar:

    Aktuellt värde Nytt värde Kommentar
    bootstrap.servers=localhost:9092 Ersätt värdet localhost:9092 med koordinatorvärdarna från föregående steg Konfigurerar den fristående konfigurationen för gränsnoden för att hitta Kafka-koordinatorerna.
    key.converter=org.apache.kafka.connect.json.JsonConverter key.converter=org.apache.kafka.connect.storage.StringConverter Med den här ändringen kan du testa med hjälp av konsolproducenten som ingår i Kafka. Du kan behöva olika konverterare för andra producenter och konsumenter. Information om hur du använder andra konverterarvärden finns i https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
    value.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.storage.StringConverter Samma som anges.
    Ej tillämpligt consumer.max.poll.records=10 Lägg till i slutet av filen. Den här ändringen är att förhindra tidsgränser i mottagaranslutningen genom att begränsa den till 10 poster i taget. Mer information finns i https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
  7. Spara filen genom att använda Ctrl + X, Y och sedan Retur.

  8. Om du vill skapa de ämnen som används av anslutningsappen använder du följande kommandon:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS
    
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
    

    Kontrollera att ämnena iotin och iotout finns med följande kommando:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
    

    Ämnet iotin används för att ta emot meddelanden från IoT Hub. Ämnet iotout används för att skicka meddelanden till IoT Hub.

Hämta IoT Hub-anslutningsinformation

Använd följande steg för att hämta IoT Hub-information som används av anslutningsappen:

  1. Hämta det Event Hub-kompatibla slutpunkten och det Event Hub-kompatibla slutpunktsnamnet för din IoT-hubb. Använd någon av följande metoder för att hämta den här informationen:

    • Använd följande steg från Azure Portal:

      1. Gå till din IoT Hub och välj Slutpunkter.

      2. Välj Händelser från inbyggda slutpunkter.

      3. Kopiera värdet för följande fält från Egenskaper:

        • Event Hub-kompatibelt namn
        • Event Hub-kompatibel slutpunkt
        • Partitioner

        Viktigt!

        Slutpunktsvärdet från portalen kan innehålla extra text som inte behövs i det här exemplet. Extrahera texten som matchar det här mönstret sb://<randomnamespace>.servicebus.windows.net/.

    • Använd följande kommando från Azure CLI:

      az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
      

      Ersätt myhubname med namnet på din IoT-hubb. Svaret liknar följande text:

      "EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/",
      "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e",
      "Partitions": 2
      
  2. Hämta principen och nyckeln för delad åtkomst. I det här exemplet använder du tjänstnyckeln. Använd någon av följande metoder för att hämta den här informationen:

    • Använd följande steg från Azure Portal:

      1. Välj Principer för delad åtkomst och välj sedan tjänst.
      2. Kopiera primärnyckelvärdet.
      3. Kopiera anslutningssträngen – primärnyckelvärde .
    • Använd följande kommando från Azure CLI:

      1. Använd följande kommando för att hämta primärnyckelvärdet:

        az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
        

        Ersätt myhubname med namnet på din IoT-hubb. Svaret är den primära nyckeln till service principen för den här hubben.

      2. Använd följande kommando för att hämta anslutningssträng för service principen:

        az iot hub connection-string show --name myhubname --policy-name service --query "connectionString"
        

        Ersätt myhubname med namnet på din IoT-hubb. Svaret är anslutningssträng för service principen.

Konfigurera källanslutningen

Utför följande åtgärder från en SSH-anslutning till gränsnoden för att konfigurera källan att fungera med din IoT Hub:

  1. Skapa en kopia av connect-iot-source.properties filen i /usr/hdp/current/kafka-broker/config/ katalogen. Om du vill ladda ned filen från projektet toketi-kafka-connect-iothub använder du följande kommando:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
    
  2. Om du vill redigera connect-iot-source.properties filen och lägga till IoT Hub-informationen använder du följande kommando:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    
  3. Leta upp och ändra följande poster i redigeringsprogrammet:

    Aktuellt värde Redigera
    Kafka.Topic=PLACEHOLDER Ersätt PLACEHOLDER med iotin Meddelanden som tas emot från IoT Hub placeras i ämnet iotin .
    IotHub.EventHubCompatibleName=PLACEHOLDER Ersätt PLACEHOLDER med det Event Hub-kompatibla namnet.
    IotHub.EventHubCompatibleEndpoint=PLACEHOLDER Ersätt PLACEHOLDER med den Event Hub-kompatibla slutpunkten.
    IotHub.AccessKeyName=PLACEHOLDER Ersätt PLACEHOLDER med service
    IotHub.AccessKeyValue=PLACEHOLDER Ersätt PLACEHOLDER med principens service primära nyckel.
    IotHub.Partitions=PLACEHOLDER Ersätt PLACEHOLDER med antalet partitioner från föregående steg.
    IotHub.StartTime=PLACEHOLDER Ersätt PLACEHOLDER med ett UTC-datum. Det här datumet är när anslutningsappen börjar söka efter meddelanden. Datumformatet är yyyy-mm-ddThh:mm:ssZ.
    BatchSize=100 Ersätt 100 med 5 Den här ändringen gör att anslutningsappen läser meddelanden till Kafka när det finns fem nya meddelanden i IoT Hub.

    Ett exempel på en konfiguration finns i Kafka Connect Source Connector för Azure IoT Hub.

  4. Spara ändringarna genom att använda Ctrl + X, Y och sedan Retur.

Mer information om hur du konfigurerar anslutningskällan finns i https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md.

Konfigurera mottagaranslutningen

Utför följande åtgärder från en SSH-anslutning till gränsnoden för att konfigurera mottagaranslutningen så att den fungerar med din IoT Hub:

  1. Skapa en kopia av connect-iothub-sink.properties filen i /usr/hdp/current/kafka-broker/config/ katalogen. Om du vill ladda ned filen från projektet toketi-kafka-connect-iothub använder du följande kommando:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
    
  2. Om du vill redigera connect-iothub-sink.properties filen och lägga till IoT Hub-informationen använder du följande kommando:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
    
  3. Leta upp och ändra följande poster i redigeringsprogrammet:

    Aktuellt värde Redigera
    topics=PLACEHOLDER Ersätt PLACEHOLDER med iotout Meddelanden som skrivs till iotout ämnet vidarebefordras till IoT-hubben.
    IotHub.ConnectionString=PLACEHOLDER Ersätt PLACEHOLDER med principens anslutningssträngservice.

    Ett exempel på en konfiguration finns i Kafka Connect Sink Connector för Azure IoT Hub.

  4. Spara ändringarna genom att använda Ctrl + X, Y och sedan Retur.

Mer information om hur du konfigurerar anslutningsmottagaren finns i https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

Starta källanslutningsappen

  1. Starta källanslutningsappen genom att använda följande kommando från en SSH-anslutning till gränsnoden:

    /usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    

    När anslutningsappen startar skickar du meddelanden till IoT Hub från dina enheter. När anslutningsappen läser meddelanden från IoT-hubben och lagrar dem i Kafka-ämnet loggar den information till konsolen:

    [2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39)
    [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
    

    Kommentar

    Du kan se flera varningar när anslutningsappen startar. Dessa varningar orsakar inte problem med att ta emot meddelanden från IoT Hub.

  2. Stoppa anslutningsappen efter några minuter med Ctrl + C två gånger. Det tar några minuter innan anslutningsappen stoppas.

Starta mottagaranslutningen

Från en SSH-anslutning till gränsnoden använder du följande kommando för att starta mottagaranslutningen i fristående läge:

/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties

När anslutningsappen körs visas information som liknar följande text:

[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

Kommentar

Du kan märka flera varningar när anslutningsappen startas. Du kan ignorera dem.

Skicka meddelanden

Använd följande steg för att skicka meddelanden via anslutningsappen:

  1. Öppna en andra SSH-session till Kafka-klustret:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Hämta adressen till Kafka-koordinatorerna för den nya SSH-sessionen. Ersätt LÖSENORD med lösenordet för klusterinloggning och ange sedan kommandot:

    export password='PASSWORD'
    
    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    
  3. Om du vill skicka meddelanden till ämnet iotout använder du följande kommando:

    /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
    

    Det här kommandot returnerar inte den normala Bash-prompten. I stället skickar den tangentbordsindata till ämnet iotout .

  4. Om du vill skicka ett meddelande till enheten klistrar du in ett JSON-dokument i SSH-sessionen för kafka-console-producer.

    Viktigt!

    Du måste ange värdet för "deviceId" posten till enhetens ID. I följande exempel heter myDeviceIdenheten :

    {"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
    

    Schemat för det här JSON-dokumentet beskrivs mer detaljerat på https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

Om du använder den simulerade Raspberry Pi-enheten och den körs loggar enheten följande meddelande.

Receive message: Turn On


Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.

Mer information om hur du använder mottagaranslutningsappen finns i https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

Nästa steg

I det här dokumentet har du lärt dig hur du använder Apache Kafka Connect API för att starta IoT Kafka Connector i HDInsight. Använd följande länkar för att identifiera andra sätt att arbeta med Kafka: