Apache Kafka gebruiken in HDInsight met Azure IoT Hub
Meer informatie over het gebruik van de Apache Kafka Connect Azure IoT Hub-connector om gegevens te verplaatsen tussen Apache Kafka in HDInsight en Azure IoT Hub. In dit document leert u hoe u de IoT Hub-connector uitvoert vanaf een Edge-knooppunt in het cluster.
Met de Kafka Connect-API kunt u connectors implementeren die continu gegevens naar Kafka halen of gegevens van Kafka naar een ander systeem pushen. Apache Kafka Connect Azure IoT Hub is een connector die gegevens uit Azure IoT Hub naar Kafka haalt. Het kan ook gegevens van Kafka naar de IoT Hub pushen.
Bij het ophalen van de IoT Hub gebruikt u een bronconnector . Wanneer u pusht naar IoT Hub, gebruikt u een sink-connector . De IoT Hub-connector biedt zowel de bron- als sinkconnectors.
In het volgende diagram ziet u de gegevensstroom tussen Azure IoT Hub en Kafka in HDInsight wanneer u de connector gebruikt.
Zie https://kafka.apache.org/documentation/#connectvoor meer informatie over het verbinden van api's.
Vereisten
Een Apache Kafka-cluster in HDInsight. Zie het snelstartdocument kafka in HDInsight voor meer informatie.
Een Edge-knooppunt in het Kafka-cluster. Zie Edge-knooppunten gebruiken met HDInsight-document voor meer informatie.
Een SSH-client. Zie voor meer informatie Verbinding maken met HDInsight (Apache Hadoop) via SSH.
Een Azure IoT Hub en een apparaat. Voor dit artikel kunt u de onlinesimulator Connect Raspberry Pi gebruiken voor Azure IoT Hub.
De connector bouwen
Download de bron voor de connector van https://github.com/Azure/toketi-kafka-connect-iothub/ naar uw lokale omgeving.
Navigeer vanuit een opdrachtprompt naar de
toketi-kafka-connect-iothub-master
map. Gebruik vervolgens de volgende opdracht om het project te bouwen en te verpakken:sbt assembly
Het duurt enkele minuten voordat de build is voltooid. Met de opdracht maakt u een bestand met de naam
kafka-connect-iothub-assembly_2.11-0.7.0.jar
in detoketi-kafka-connect-iothub-master\target\scala-2.11
map voor het project.
Installeer de connector
Upload het .jar-bestand naar het edge-knooppunt van uw Kafka-cluster in HDInsight. Bewerk de volgende opdracht door de werkelijke naam van uw cluster te vervangen
CLUSTERNAME
. De standaardwaarden voor het SSH-gebruikersaccount en de naam van het Edge-knooppunt worden gebruikt om indien nodig te wijzigen.scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
Zodra het kopiƫren van het bestand is voltooid, maakt u verbinding met het Edge-knooppunt met behulp van SSH:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
Gebruik de volgende opdracht om de connector te installeren in de Kafka-map
libs
:sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
Houd uw SSH-verbinding actief voor de resterende stappen.
Apache Kafka configureren
Gebruik vanuit uw SSH-verbinding met het Edge-knooppunt de volgende stappen om Kafka te configureren om de connector uit te voeren in de zelfstandige modus:
wachtwoordvariabele instellen. Vervang PASSWORD door het wachtwoord voor het aanmelden bij het cluster en voer vervolgens de opdracht in:
export password='PASSWORD'
Installeer het hulpprogramma jq . jq maakt het eenvoudiger om JSON-documenten te verwerken die worden geretourneerd door Ambari-query's. Voer de volgende opdracht in:
sudo apt -y install jq
Haal het adres van de Kafka-brokers op. Er zijn mogelijk veel brokers in uw cluster, maar u hoeft slechts naar een of twee te verwijzen. Gebruik de volgende opdracht om het adres van twee brokerhosts op te halen:
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2` echo $KAFKABROKERS
Kopieer de waarden voor later gebruik. De geretourneerde waarde ziet er ongeveer zo uit:
<brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092
Haal het adres op van de Apache Zookeeper-knooppunten. Er zijn verschillende Zookeeper-knooppunten in het cluster, maar u hoeft slechts naar een of twee te verwijzen. Gebruik de volgende opdracht om de adressen op te slaan in de variabele
KAFKAZKHOSTS
:export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
Wanneer u de connector uitvoert in de zelfstandige modus, wordt het
/usr/hdp/current/kafka-broker/config/connect-standalone.properties
bestand gebruikt om te communiceren met de Kafka-brokers. Gebruik de volgende opdracht om hetconnect-standalone.properties
bestand te bewerken:sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
Voer de volgende bewerkingen uit:
Huidige waarde Nieuwe waarde Opmerking bootstrap.servers=localhost:9092
Vervang de localhost:9092
waarde door de brokerhosts uit de vorige stapHiermee configureert u de zelfstandige configuratie voor het Edge-knooppunt om de Kafka-brokers te vinden. key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
Met deze wijziging kunt u testen met behulp van de consoleproducent die is opgenomen in Kafka. Mogelijk hebt u verschillende conversieprogramma's nodig voor andere producenten en consumenten. Zie voor meer informatie over het gebruik van andere conversieprogrammawaarden https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
Hetzelfde als gegeven. N.v.t. consumer.max.poll.records=10
Toevoegen aan het einde van het bestand. Deze wijziging is om time-outs in de sinkconnector te voorkomen door deze te beperken tot 10 records tegelijk. Zie https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md voor meer informatie. Als u het bestand wilt opslaan, gebruikt u Ctrl+ X, Y en vervolgens Enter.
Gebruik de volgende opdrachten om de onderwerpen te maken die door de connector worden gebruikt:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
Gebruik de volgende opdracht om te controleren of de
iotin
eneiotout
onderwerpen bestaan:/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
Het
iotin
onderwerp wordt gebruikt voor het ontvangen van berichten van IoT Hub. Hetiotout
onderwerp wordt gebruikt voor het verzenden van berichten naar IoT Hub.
IoT Hub-verbindingsgegevens ophalen
Voer de volgende stappen uit om ioT-hubgegevens op te halen die door de connector worden gebruikt:
Haal het event hub-compatibele eindpunt en de event hub-compatibele eindpuntnaam op voor uw IoT-hub. Gebruik een van de volgende methoden om deze informatie op te halen:
Gebruik in Azure Portal de volgende stappen:
Navigeer naar uw IoT Hub en selecteer Eindpunten.
Selecteer Gebeurtenissen in ingebouwde eindpunten.
Kopieer vanuit Eigenschappen de waarde van de volgende velden:
- Naam die compatibel is met Event Hub
- Event Hub-compatibel eindpunt
- Partities
Belangrijk
De eindpuntwaarde van de portal kan extra tekst bevatten die niet nodig is in dit voorbeeld. Pak de tekst uit die overeenkomt met dit patroon
sb://<randomnamespace>.servicebus.windows.net/
.
Gebruik vanuit de Azure CLI de volgende opdracht:
az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
Vervang door
myhubname
de naam van uw IoT-hub. Het antwoord is vergelijkbaar met de volgende tekst:"EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/", "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e", "Partitions": 2
Haal het beleid en de sleutel voor gedeelde toegang op. Gebruik voor dit voorbeeld de servicesleutel . Gebruik een van de volgende methoden om deze informatie op te halen:
Gebruik in Azure Portal de volgende stappen:
- Selecteer Beleid voor gedeelde toegang en selecteer vervolgens de service.
- Kopieer de waarde van de primaire sleutel .
- Kopieer de waarde van de verbindingsreeks- en primaire sleutel .
Gebruik vanuit de Azure CLI de volgende opdracht:
Gebruik de volgende opdracht om de waarde van de primaire sleutel op te halen:
az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
Vervang door
myhubname
de naam van uw IoT-hub. Het antwoord is de primaire sleutel voor hetservice
beleid voor deze hub.Gebruik de volgende opdracht om de verbindingsreeks voor het
service
beleid op te halen:az iot hub connection-string show --name myhubname --policy-name service --query "connectionString"
Vervang door
myhubname
de naam van uw IoT-hub. Het antwoord is de verbindingsreeks voor hetservice
beleid.
De bronverbinding configureren
Als u de bron wilt configureren voor gebruik met uw IoT Hub, moet u de volgende acties uitvoeren vanuit een SSH-verbinding met het edge-knooppunt:
Maak een kopie van het
connect-iot-source.properties
bestand in de/usr/hdp/current/kafka-broker/config/
map. Gebruik de volgende opdracht om het bestand te downloaden van het project toketi-kafka-connect-iothub:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
Gebruik de volgende opdracht om het
connect-iot-source.properties
bestand te bewerken en de IoT-hubgegevens toe te voegen:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
Zoek en wijzig de volgende vermeldingen in de editor:
Huidige waarde Bewerken Kafka.Topic=PLACEHOLDER
Vervang PLACEHOLDER
dooriotin
. Berichten die zijn ontvangen van IoT Hub, worden in hetiotin
onderwerp geplaatst.IotHub.EventHubCompatibleName=PLACEHOLDER
Vervang door PLACEHOLDER
de naam die compatibel is met Event Hub.IotHub.EventHubCompatibleEndpoint=PLACEHOLDER
Vervang PLACEHOLDER
door het Event Hub-compatibele eindpunt.IotHub.AccessKeyName=PLACEHOLDER
Vervang PLACEHOLDER
doorservice
.IotHub.AccessKeyValue=PLACEHOLDER
Vervang door PLACEHOLDER
de primaire sleutel van hetservice
beleid.IotHub.Partitions=PLACEHOLDER
Vervang PLACEHOLDER
door het aantal partities uit de vorige stappen.IotHub.StartTime=PLACEHOLDER
Vervang door PLACEHOLDER
een UTC-datum. Deze datum is wanneer de connector begint te controleren op berichten. De datumnotatie isyyyy-mm-ddThh:mm:ssZ
.BatchSize=100
Vervang 100
door5
. Deze wijziging zorgt ervoor dat de connector berichten in Kafka leest zodra er vijf nieuwe berichten in IoT Hub zijn.Zie Kafka Connect Source Connector voor Azure IoT Hub voor een voorbeeldconfiguratie.
Als u wijzigingen wilt opslaan, gebruikt u Ctrl+ X, Y en vervolgens Enter.
Zie https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.mdvoor meer informatie over het configureren van de connectorbron.
De sinkverbinding configureren
Als u de sinkverbinding wilt configureren om met uw IoT Hub te werken, moet u de volgende acties uitvoeren vanuit een SSH-verbinding met het edge-knooppunt:
Maak een kopie van het
connect-iothub-sink.properties
bestand in de/usr/hdp/current/kafka-broker/config/
map. Gebruik de volgende opdracht om het bestand te downloaden van het project toketi-kafka-connect-iothub:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
Gebruik de volgende opdracht om het
connect-iothub-sink.properties
bestand te bewerken en de IoT-hubgegevens toe te voegen:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
Zoek en wijzig de volgende vermeldingen in de editor:
Huidige waarde Bewerken topics=PLACEHOLDER
Vervang PLACEHOLDER
dooriotout
. Berichten die naariotout
het onderwerp worden geschreven, worden doorgestuurd naar de IoT-hub.IotHub.ConnectionString=PLACEHOLDER
Vervang PLACEHOLDER
door de verbindingsreeks voor hetservice
beleid.Zie Kafka Connect Sink Connector voor Azure IoT Hub voor een voorbeeldconfiguratie.
Als u wijzigingen wilt opslaan, gebruikt u Ctrl+ X, Y en vervolgens Enter.
Zie voor meer informatie over het configureren van de connector-sink https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
De bronconnector starten
Gebruik de volgende opdracht vanuit een SSH-verbinding met het Edge-knooppunt om de bronconnector te starten:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
Zodra de connector is gestart, verzendt u berichten naar IoT Hub vanaf uw apparaat(en). Wanneer de connector berichten van de IoT-hub leest en opslaat in het Kafka-onderwerp, worden gegevens in de console geregistreerd:
[2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39) [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
Notitie
Mogelijk ziet u verschillende waarschuwingen wanneer de connector wordt gestart. Deze waarschuwingen veroorzaken geen problemen met het ontvangen van berichten van IoT Hub.
Stop de verbindingslijn na een paar minuten met Ctrl +C . Het duurt enkele minuten voordat de connector is gestopt.
De sinkconnector starten
Gebruik vanuit een SSH-verbinding met het Edge-knooppunt de volgende opdracht om de sink-connector in de zelfstandige modus te starten:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
Terwijl de connector wordt uitgevoerd, wordt informatie weergegeven die vergelijkbaar is met de volgende tekst:
[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
Notitie
U ziet mogelijk verschillende waarschuwingen wanneer de connector wordt gestart. U kunt deze gewoon negeren.
Berichten verzenden
Gebruik de volgende stappen om berichten via de connector te verzenden:
Open een tweede SSH-sessie naar het Kafka-cluster:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
Haal het adres op van de Kafka-brokers voor de nieuwe ssh-sessie. Vervang PASSWORD door het wachtwoord voor het aanmelden bij het cluster en voer vervolgens de opdracht in:
export password='PASSWORD' export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
Gebruik de volgende opdracht om berichten naar het
iotout
onderwerp te verzenden:/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
Met deze opdracht keert u niet terug naar de normale Bash-prompt. In plaats daarvan verzendt het toetsenbordinvoer naar het
iotout
onderwerp.Als u een bericht naar uw apparaat wilt verzenden, plakt u een JSON-document in de SSH-sessie voor de
kafka-console-producer
.Belangrijk
U moet de waarde van de
"deviceId"
vermelding instellen op de id van uw apparaat. In het volgende voorbeeld heeft het apparaat de naammyDeviceId
:{"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
Het schema voor dit JSON-document wordt in meer detail beschreven op https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Als u het gesimuleerde Raspberry Pi-apparaat gebruikt en het wordt uitgevoerd, registreert het apparaat het volgende bericht.
Receive message: Turn On
Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.
Zie voor meer informatie over het gebruik van de sinkconnector https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Volgende stappen
In dit document hebt u geleerd hoe u de Apache Kafka Connect-API gebruikt om de IoT Kafka-connector in HDInsight te starten. Gebruik de volgende koppelingen om andere manieren te ontdekken om met Kafka te werken: