Använda Apache Kafka i HDInsight med Azure IoT Hub
Lär dig hur du använder Apache Kafka Connect Azure IoT Hub-anslutningsappen för att flytta data mellan Apache Kafka i HDInsight och Azure IoT Hub. I det här dokumentet får du lära dig hur du kör IoT Hub-anslutningsappen från en kantnod i klustret.
Med Kafka Connect API kan du implementera anslutningsappar som kontinuerligt hämtar data till Kafka eller push-överför data från Kafka till ett annat system. Apache Kafka Connect Azure IoT Hub är en anslutningsapp som hämtar data från Azure IoT Hub till Kafka. Den kan också skicka data från Kafka till IoT Hub.
När du hämtar från IoT Hub använder du en källanslutning . När du push-överför till IoT Hub använder du en mottagaranslutning . IoT Hub-anslutningsappen tillhandahåller både käll- och mottagaranslutningarna.
Följande diagram visar dataflödet mellan Azure IoT Hub och Kafka i HDInsight när du använder anslutningsappen.
Mer information om hur du ansluter API finns i https://kafka.apache.org/documentation/#connect.
Förutsättningar
Ett Apache Kafka-kluster i HDInsight. Mer information finns i snabbstartsdokumentet kafka på HDInsight.
En kantnod i Kafka-klustret. Mer information finns i Använda kantnoder med HDInsight-dokument .
En SSH-klient. Mer information finns i Ansluta till HDInsight (Apache Hadoop) med hjälp av SSH.
En Azure IoT Hub och enhet. I den här artikeln bör du överväga att använda Connect Raspberry Pi-onlinesimulatorn till Azure IoT Hub.
Build-verktyg för Scala.
Skapa anslutningsappen
Ladda ned källan för anslutningsappen från https://github.com/Azure/toketi-kafka-connect-iothub/ till din lokala miljö.
Gå till katalogen från
toketi-kafka-connect-iothub-master
en kommandotolk. Använd sedan följande kommando för att skapa och paketera projektet:sbt assembly
Det tar några minuter att slutföra bygget. Kommandot skapar en fil med namnet
kafka-connect-iothub-assembly_2.11-0.7.0.jar
itoketi-kafka-connect-iothub-master\target\scala-2.11
katalogen för projektet.
Installera anslutningsappen
Ladda upp .jar-filen till kantnoden för din Kafka i HDInsight-klustret. Redigera följande kommando genom att
CLUSTERNAME
ersätta med det faktiska namnet på klustret. Standardvärdena för SSH-användarkontot och namnet på gränsnoden används för att ändra efter behov.scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
När filkopian är klar ansluter du till kantnoden med hjälp av SSH:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
Om du vill installera anslutningsappen i Kafka-katalogen
libs
använder du följande kommando:sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
Håll SSH-anslutningen aktiv för de återstående stegen.
Konfigurera Apache Kafka
Från SSH-anslutningen till gränsnoden använder du följande steg för att konfigurera Kafka att köra anslutningsappen i fristående läge:
Konfigurera lösenordsvariabel. Ersätt LÖSENORD med lösenordet för klusterinloggning och ange sedan kommandot:
export password='PASSWORD'
Installera jq-verktyget. jq gör det enklare att bearbeta JSON-dokument som returneras från Ambari-frågor. Ange följande kommando:
sudo apt -y install jq
Hämta adressen till Kafka-mäklarna. Det kan finnas många koordinatorer i klustret, men du behöver bara referera till en eller två. Använd följande kommando för att hämta adressen till två broker-värdar:
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2` echo $KAFKABROKERS
Kopiera värdena för senare användning. Det värde som genereras liknar följande text:
<brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092
Hämta adressen till Apache Zookeeper-noderna. Det finns flera Zookeeper-noder i klustret, men du behöver bara referera till en eller två. Använd följande kommando för att lagra adresserna i variabeln
KAFKAZKHOSTS
:export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
När du kör anslutningsappen i fristående läge
/usr/hdp/current/kafka-broker/config/connect-standalone.properties
används filen för att kommunicera med Kafka-koordinatorerna. Om du vill redigeraconnect-standalone.properties
filen använder du följande kommando:sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
Gör följande ändringar:
Aktuellt värde Nytt värde Kommentar bootstrap.servers=localhost:9092
Ersätt värdet localhost:9092
med koordinatorvärdarna från föregående stegKonfigurerar den fristående konfigurationen för gränsnoden för att hitta Kafka-koordinatorerna. key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
Med den här ändringen kan du testa med hjälp av konsolproducenten som ingår i Kafka. Du kan behöva olika konverterare för andra producenter och konsumenter. Information om hur du använder andra konverterarvärden finns i https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
Samma som anges. Ej tillämpligt consumer.max.poll.records=10
Lägg till i slutet av filen. Den här ändringen är att förhindra tidsgränser i mottagaranslutningen genom att begränsa den till 10 poster i taget. Mer information finns i https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. Spara filen genom att använda Ctrl + X, Y och sedan Retur.
Om du vill skapa de ämnen som används av anslutningsappen använder du följande kommandon:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
Kontrollera att ämnena
iotin
ochiotout
finns med följande kommando:/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
Ämnet
iotin
används för att ta emot meddelanden från IoT Hub. Ämnetiotout
används för att skicka meddelanden till IoT Hub.
Hämta IoT Hub-anslutningsinformation
Använd följande steg för att hämta IoT Hub-information som används av anslutningsappen:
Hämta det Event Hub-kompatibla slutpunkten och det Event Hub-kompatibla slutpunktsnamnet för din IoT-hubb. Använd någon av följande metoder för att hämta den här informationen:
Använd följande steg från Azure Portal:
Gå till din IoT Hub och välj Slutpunkter.
Välj Händelser från inbyggda slutpunkter.
Kopiera värdet för följande fält från Egenskaper:
- Event Hub-kompatibelt namn
- Event Hub-kompatibel slutpunkt
- Partitioner
Viktigt!
Slutpunktsvärdet från portalen kan innehålla extra text som inte behövs i det här exemplet. Extrahera texten som matchar det här mönstret
sb://<randomnamespace>.servicebus.windows.net/
.
Använd följande kommando från Azure CLI:
az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
Ersätt
myhubname
med namnet på din IoT-hubb. Svaret liknar följande text:"EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/", "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e", "Partitions": 2
Hämta principen och nyckeln för delad åtkomst. I det här exemplet använder du tjänstnyckeln. Använd någon av följande metoder för att hämta den här informationen:
Använd följande steg från Azure Portal:
- Välj Principer för delad åtkomst och välj sedan tjänst.
- Kopiera primärnyckelvärdet.
- Kopiera anslutningssträngen – primärnyckelvärde .
Använd följande kommando från Azure CLI:
Använd följande kommando för att hämta primärnyckelvärdet:
az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
Ersätt
myhubname
med namnet på din IoT-hubb. Svaret är den primära nyckeln tillservice
principen för den här hubben.Använd följande kommando för att hämta anslutningssträng för
service
principen:az iot hub connection-string show --name myhubname --policy-name service --query "connectionString"
Ersätt
myhubname
med namnet på din IoT-hubb. Svaret är anslutningssträng förservice
principen.
Konfigurera källanslutningen
Utför följande åtgärder från en SSH-anslutning till gränsnoden för att konfigurera källan att fungera med din IoT Hub:
Skapa en kopia av
connect-iot-source.properties
filen i/usr/hdp/current/kafka-broker/config/
katalogen. Om du vill ladda ned filen från projektet toketi-kafka-connect-iothub använder du följande kommando:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
Om du vill redigera
connect-iot-source.properties
filen och lägga till IoT Hub-informationen använder du följande kommando:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
Leta upp och ändra följande poster i redigeringsprogrammet:
Aktuellt värde Redigera Kafka.Topic=PLACEHOLDER
Ersätt PLACEHOLDER
mediotin
Meddelanden som tas emot från IoT Hub placeras i ämnetiotin
.IotHub.EventHubCompatibleName=PLACEHOLDER
Ersätt PLACEHOLDER
med det Event Hub-kompatibla namnet.IotHub.EventHubCompatibleEndpoint=PLACEHOLDER
Ersätt PLACEHOLDER
med den Event Hub-kompatibla slutpunkten.IotHub.AccessKeyName=PLACEHOLDER
Ersätt PLACEHOLDER
medservice
IotHub.AccessKeyValue=PLACEHOLDER
Ersätt PLACEHOLDER
med principensservice
primära nyckel.IotHub.Partitions=PLACEHOLDER
Ersätt PLACEHOLDER
med antalet partitioner från föregående steg.IotHub.StartTime=PLACEHOLDER
Ersätt PLACEHOLDER
med ett UTC-datum. Det här datumet är när anslutningsappen börjar söka efter meddelanden. Datumformatet äryyyy-mm-ddThh:mm:ssZ
.BatchSize=100
Ersätt 100
med5
Den här ändringen gör att anslutningsappen läser meddelanden till Kafka när det finns fem nya meddelanden i IoT Hub.Ett exempel på en konfiguration finns i Kafka Connect Source Connector för Azure IoT Hub.
Spara ändringarna genom att använda Ctrl + X, Y och sedan Retur.
Mer information om hur du konfigurerar anslutningskällan finns i https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md.
Konfigurera mottagaranslutningen
Utför följande åtgärder från en SSH-anslutning till gränsnoden för att konfigurera mottagaranslutningen så att den fungerar med din IoT Hub:
Skapa en kopia av
connect-iothub-sink.properties
filen i/usr/hdp/current/kafka-broker/config/
katalogen. Om du vill ladda ned filen från projektet toketi-kafka-connect-iothub använder du följande kommando:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
Om du vill redigera
connect-iothub-sink.properties
filen och lägga till IoT Hub-informationen använder du följande kommando:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
Leta upp och ändra följande poster i redigeringsprogrammet:
Aktuellt värde Redigera topics=PLACEHOLDER
Ersätt PLACEHOLDER
mediotout
Meddelanden som skrivs tilliotout
ämnet vidarebefordras till IoT-hubben.IotHub.ConnectionString=PLACEHOLDER
Ersätt PLACEHOLDER
med principens anslutningssträngservice
.Ett exempel på en konfiguration finns i Kafka Connect Sink Connector för Azure IoT Hub.
Spara ändringarna genom att använda Ctrl + X, Y och sedan Retur.
Mer information om hur du konfigurerar anslutningsmottagaren finns i https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Starta källanslutningsappen
Starta källanslutningsappen genom att använda följande kommando från en SSH-anslutning till gränsnoden:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
När anslutningsappen startar skickar du meddelanden till IoT Hub från dina enheter. När anslutningsappen läser meddelanden från IoT-hubben och lagrar dem i Kafka-ämnet loggar den information till konsolen:
[2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39) [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
Kommentar
Du kan se flera varningar när anslutningsappen startar. Dessa varningar orsakar inte problem med att ta emot meddelanden från IoT Hub.
Stoppa anslutningsappen efter några minuter med Ctrl + C två gånger. Det tar några minuter innan anslutningsappen stoppas.
Starta mottagaranslutningen
Från en SSH-anslutning till gränsnoden använder du följande kommando för att starta mottagaranslutningen i fristående läge:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
När anslutningsappen körs visas information som liknar följande text:
[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
Kommentar
Du kan märka flera varningar när anslutningsappen startas. Du kan ignorera dem.
Skicka meddelanden
Använd följande steg för att skicka meddelanden via anslutningsappen:
Öppna en andra SSH-session till Kafka-klustret:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
Hämta adressen till Kafka-koordinatorerna för den nya SSH-sessionen. Ersätt LÖSENORD med lösenordet för klusterinloggning och ange sedan kommandot:
export password='PASSWORD' export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
Om du vill skicka meddelanden till ämnet
iotout
använder du följande kommando:/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
Det här kommandot returnerar inte den normala Bash-prompten. I stället skickar den tangentbordsindata till ämnet
iotout
.Om du vill skicka ett meddelande till enheten klistrar du in ett JSON-dokument i SSH-sessionen för
kafka-console-producer
.Viktigt!
Du måste ange värdet för
"deviceId"
posten till enhetens ID. I följande exempel hetermyDeviceId
enheten :{"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
Schemat för det här JSON-dokumentet beskrivs mer detaljerat på https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Om du använder den simulerade Raspberry Pi-enheten och den körs loggar enheten följande meddelande.
Receive message: Turn On
Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.
Mer information om hur du använder mottagaranslutningsappen finns i https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Nästa steg
I det här dokumentet har du lärt dig hur du använder Apache Kafka Connect API för att starta IoT Kafka Connector i HDInsight. Använd följande länkar för att identifiera andra sätt att arbeta med Kafka: