Delen via


Apache Kafka gebruiken in HDInsight met Azure IoT Hub

Meer informatie over het gebruik van de Apache Kafka Connect Azure IoT Hub-connector om gegevens te verplaatsen tussen Apache Kafka in HDInsight en Azure IoT Hub. In dit document leert u hoe u de IoT Hub-connector uitvoert vanaf een Edge-knooppunt in het cluster.

Met de Kafka Connect-API kunt u connectors implementeren die continu gegevens naar Kafka halen of gegevens van Kafka naar een ander systeem pushen. Apache Kafka Connect Azure IoT Hub is een connector die gegevens uit Azure IoT Hub naar Kafka haalt. Het kan ook gegevens van Kafka naar de IoT Hub pushen.

Bij het ophalen van de IoT Hub gebruikt u een bronconnector . Wanneer u pusht naar IoT Hub, gebruikt u een sink-connector . De IoT Hub-connector biedt zowel de bron- als sinkconnectors.

In het volgende diagram ziet u de gegevensstroom tussen Azure IoT Hub en Kafka in HDInsight wanneer u de connector gebruikt.

Afbeelding van gegevens die stromen van IoT Hub naar Kafka via de connector.

Zie https://kafka.apache.org/documentation/#connectvoor meer informatie over het verbinden van api's.

Vereisten

De connector bouwen

  1. Download de bron voor de connector van https://github.com/Azure/toketi-kafka-connect-iothub/ naar uw lokale omgeving.

  2. Navigeer vanuit een opdrachtprompt naar de toketi-kafka-connect-iothub-master map. Gebruik vervolgens de volgende opdracht om het project te bouwen en te verpakken:

    sbt assembly
    

    Het duurt enkele minuten voordat de build is voltooid. Met de opdracht maakt u een bestand met de naam kafka-connect-iothub-assembly_2.11-0.7.0.jar in de toketi-kafka-connect-iothub-master\target\scala-2.11 map voor het project.

Installeer de connector

  1. Upload het .jar-bestand naar het edge-knooppunt van uw Kafka-cluster in HDInsight. Bewerk de volgende opdracht door de werkelijke naam van uw cluster te vervangen CLUSTERNAME . De standaardwaarden voor het SSH-gebruikersaccount en de naam van het Edge-knooppunt worden gebruikt om indien nodig te wijzigen.

    scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
    
  2. Zodra het kopiƫren van het bestand is voltooid, maakt u verbinding met het Edge-knooppunt met behulp van SSH:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  3. Gebruik de volgende opdracht om de connector te installeren in de Kafka-map libs :

    sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
    

Houd uw SSH-verbinding actief voor de resterende stappen.

Apache Kafka configureren

Gebruik vanuit uw SSH-verbinding met het Edge-knooppunt de volgende stappen om Kafka te configureren om de connector uit te voeren in de zelfstandige modus:

  1. wachtwoordvariabele instellen. Vervang PASSWORD door het wachtwoord voor het aanmelden bij het cluster en voer vervolgens de opdracht in:

    export password='PASSWORD'
    
  2. Installeer het hulpprogramma jq . jq maakt het eenvoudiger om JSON-documenten te verwerken die worden geretourneerd door Ambari-query's. Voer de volgende opdracht in:

    sudo apt -y install jq
    
  3. Haal het adres van de Kafka-brokers op. Er zijn mogelijk veel brokers in uw cluster, maar u hoeft slechts naar een of twee te verwijzen. Gebruik de volgende opdracht om het adres van twee brokerhosts op te halen:

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    echo $KAFKABROKERS
    

    Kopieer de waarden voor later gebruik. De geretourneerde waarde ziet er ongeveer zo uit:

    <brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092

  4. Haal het adres op van de Apache Zookeeper-knooppunten. Er zijn verschillende Zookeeper-knooppunten in het cluster, maar u hoeft slechts naar een of twee te verwijzen. Gebruik de volgende opdracht om de adressen op te slaan in de variabele KAFKAZKHOSTS:

    export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
    
  5. Wanneer u de connector uitvoert in de zelfstandige modus, wordt het /usr/hdp/current/kafka-broker/config/connect-standalone.properties bestand gebruikt om te communiceren met de Kafka-brokers. Gebruik de volgende opdracht om het connect-standalone.properties bestand te bewerken:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
    
  6. Voer de volgende bewerkingen uit:

    Huidige waarde Nieuwe waarde Opmerking
    bootstrap.servers=localhost:9092 Vervang de localhost:9092 waarde door de brokerhosts uit de vorige stap Hiermee configureert u de zelfstandige configuratie voor het Edge-knooppunt om de Kafka-brokers te vinden.
    key.converter=org.apache.kafka.connect.json.JsonConverter key.converter=org.apache.kafka.connect.storage.StringConverter Met deze wijziging kunt u testen met behulp van de consoleproducent die is opgenomen in Kafka. Mogelijk hebt u verschillende conversieprogramma's nodig voor andere producenten en consumenten. Zie voor meer informatie over het gebruik van andere conversieprogrammawaarden https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
    value.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.storage.StringConverter Hetzelfde als gegeven.
    N.v.t. consumer.max.poll.records=10 Toevoegen aan het einde van het bestand. Deze wijziging is om time-outs in de sinkconnector te voorkomen door deze te beperken tot 10 records tegelijk. Zie https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md voor meer informatie.
  7. Als u het bestand wilt opslaan, gebruikt u Ctrl+ X, Y en vervolgens Enter.

  8. Gebruik de volgende opdrachten om de onderwerpen te maken die door de connector worden gebruikt:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS
    
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
    

    Gebruik de volgende opdracht om te controleren of de iotin ene iotout onderwerpen bestaan:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
    

    Het iotin onderwerp wordt gebruikt voor het ontvangen van berichten van IoT Hub. Het iotout onderwerp wordt gebruikt voor het verzenden van berichten naar IoT Hub.

IoT Hub-verbindingsgegevens ophalen

Voer de volgende stappen uit om ioT-hubgegevens op te halen die door de connector worden gebruikt:

  1. Haal het event hub-compatibele eindpunt en de event hub-compatibele eindpuntnaam op voor uw IoT-hub. Gebruik een van de volgende methoden om deze informatie op te halen:

    • Gebruik in Azure Portal de volgende stappen:

      1. Navigeer naar uw IoT Hub en selecteer Eindpunten.

      2. Selecteer Gebeurtenissen in ingebouwde eindpunten.

      3. Kopieer vanuit Eigenschappen de waarde van de volgende velden:

        • Naam die compatibel is met Event Hub
        • Event Hub-compatibel eindpunt
        • Partities

        Belangrijk

        De eindpuntwaarde van de portal kan extra tekst bevatten die niet nodig is in dit voorbeeld. Pak de tekst uit die overeenkomt met dit patroon sb://<randomnamespace>.servicebus.windows.net/.

    • Gebruik vanuit de Azure CLI de volgende opdracht:

      az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
      

      Vervang door myhubname de naam van uw IoT-hub. Het antwoord is vergelijkbaar met de volgende tekst:

      "EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/",
      "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e",
      "Partitions": 2
      
  2. Haal het beleid en de sleutel voor gedeelde toegang op. Gebruik voor dit voorbeeld de servicesleutel . Gebruik een van de volgende methoden om deze informatie op te halen:

    • Gebruik in Azure Portal de volgende stappen:

      1. Selecteer Beleid voor gedeelde toegang en selecteer vervolgens de service.
      2. Kopieer de waarde van de primaire sleutel .
      3. Kopieer de waarde van de verbindingsreeks- en primaire sleutel .
    • Gebruik vanuit de Azure CLI de volgende opdracht:

      1. Gebruik de volgende opdracht om de waarde van de primaire sleutel op te halen:

        az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
        

        Vervang door myhubname de naam van uw IoT-hub. Het antwoord is de primaire sleutel voor het service beleid voor deze hub.

      2. Gebruik de volgende opdracht om de verbindingsreeks voor het service beleid op te halen:

        az iot hub connection-string show --name myhubname --policy-name service --query "connectionString"
        

        Vervang door myhubname de naam van uw IoT-hub. Het antwoord is de verbindingsreeks voor het service beleid.

De bronverbinding configureren

Als u de bron wilt configureren voor gebruik met uw IoT Hub, moet u de volgende acties uitvoeren vanuit een SSH-verbinding met het edge-knooppunt:

  1. Maak een kopie van het connect-iot-source.properties bestand in de /usr/hdp/current/kafka-broker/config/ map. Gebruik de volgende opdracht om het bestand te downloaden van het project toketi-kafka-connect-iothub:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
    
  2. Gebruik de volgende opdracht om het connect-iot-source.properties bestand te bewerken en de IoT-hubgegevens toe te voegen:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    
  3. Zoek en wijzig de volgende vermeldingen in de editor:

    Huidige waarde Bewerken
    Kafka.Topic=PLACEHOLDER Vervang PLACEHOLDER door iotin. Berichten die zijn ontvangen van IoT Hub, worden in het iotin onderwerp geplaatst.
    IotHub.EventHubCompatibleName=PLACEHOLDER Vervang door PLACEHOLDER de naam die compatibel is met Event Hub.
    IotHub.EventHubCompatibleEndpoint=PLACEHOLDER Vervang PLACEHOLDER door het Event Hub-compatibele eindpunt.
    IotHub.AccessKeyName=PLACEHOLDER Vervang PLACEHOLDER door service.
    IotHub.AccessKeyValue=PLACEHOLDER Vervang door PLACEHOLDER de primaire sleutel van het service beleid.
    IotHub.Partitions=PLACEHOLDER Vervang PLACEHOLDER door het aantal partities uit de vorige stappen.
    IotHub.StartTime=PLACEHOLDER Vervang door PLACEHOLDER een UTC-datum. Deze datum is wanneer de connector begint te controleren op berichten. De datumnotatie is yyyy-mm-ddThh:mm:ssZ.
    BatchSize=100 Vervang 100 door 5. Deze wijziging zorgt ervoor dat de connector berichten in Kafka leest zodra er vijf nieuwe berichten in IoT Hub zijn.

    Zie Kafka Connect Source Connector voor Azure IoT Hub voor een voorbeeldconfiguratie.

  4. Als u wijzigingen wilt opslaan, gebruikt u Ctrl+ X, Y en vervolgens Enter.

Zie https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.mdvoor meer informatie over het configureren van de connectorbron.

De sinkverbinding configureren

Als u de sinkverbinding wilt configureren om met uw IoT Hub te werken, moet u de volgende acties uitvoeren vanuit een SSH-verbinding met het edge-knooppunt:

  1. Maak een kopie van het connect-iothub-sink.properties bestand in de /usr/hdp/current/kafka-broker/config/ map. Gebruik de volgende opdracht om het bestand te downloaden van het project toketi-kafka-connect-iothub:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
    
  2. Gebruik de volgende opdracht om het connect-iothub-sink.properties bestand te bewerken en de IoT-hubgegevens toe te voegen:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
    
  3. Zoek en wijzig de volgende vermeldingen in de editor:

    Huidige waarde Bewerken
    topics=PLACEHOLDER Vervang PLACEHOLDER door iotout. Berichten die naar iotout het onderwerp worden geschreven, worden doorgestuurd naar de IoT-hub.
    IotHub.ConnectionString=PLACEHOLDER Vervang PLACEHOLDER door de verbindingsreeks voor het service beleid.

    Zie Kafka Connect Sink Connector voor Azure IoT Hub voor een voorbeeldconfiguratie.

  4. Als u wijzigingen wilt opslaan, gebruikt u Ctrl+ X, Y en vervolgens Enter.

Zie voor meer informatie over het configureren van de connector-sink https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

De bronconnector starten

  1. Gebruik de volgende opdracht vanuit een SSH-verbinding met het Edge-knooppunt om de bronconnector te starten:

    /usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    

    Zodra de connector is gestart, verzendt u berichten naar IoT Hub vanaf uw apparaat(en). Wanneer de connector berichten van de IoT-hub leest en opslaat in het Kafka-onderwerp, worden gegevens in de console geregistreerd:

    [2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39)
    [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
    

    Notitie

    Mogelijk ziet u verschillende waarschuwingen wanneer de connector wordt gestart. Deze waarschuwingen veroorzaken geen problemen met het ontvangen van berichten van IoT Hub.

  2. Stop de verbindingslijn na een paar minuten met Ctrl +C . Het duurt enkele minuten voordat de connector is gestopt.

De sinkconnector starten

Gebruik vanuit een SSH-verbinding met het Edge-knooppunt de volgende opdracht om de sink-connector in de zelfstandige modus te starten:

/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties

Terwijl de connector wordt uitgevoerd, wordt informatie weergegeven die vergelijkbaar is met de volgende tekst:

[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

Notitie

U ziet mogelijk verschillende waarschuwingen wanneer de connector wordt gestart. U kunt deze gewoon negeren.

Berichten verzenden

Gebruik de volgende stappen om berichten via de connector te verzenden:

  1. Open een tweede SSH-sessie naar het Kafka-cluster:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Haal het adres op van de Kafka-brokers voor de nieuwe ssh-sessie. Vervang PASSWORD door het wachtwoord voor het aanmelden bij het cluster en voer vervolgens de opdracht in:

    export password='PASSWORD'
    
    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    
  3. Gebruik de volgende opdracht om berichten naar het iotout onderwerp te verzenden:

    /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
    

    Met deze opdracht keert u niet terug naar de normale Bash-prompt. In plaats daarvan verzendt het toetsenbordinvoer naar het iotout onderwerp.

  4. Als u een bericht naar uw apparaat wilt verzenden, plakt u een JSON-document in de SSH-sessie voor de kafka-console-producer.

    Belangrijk

    U moet de waarde van de "deviceId" vermelding instellen op de id van uw apparaat. In het volgende voorbeeld heeft het apparaat de naam myDeviceId:

    {"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
    

    Het schema voor dit JSON-document wordt in meer detail beschreven op https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

Als u het gesimuleerde Raspberry Pi-apparaat gebruikt en het wordt uitgevoerd, registreert het apparaat het volgende bericht.

Receive message: Turn On


Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.

Zie voor meer informatie over het gebruik van de sinkconnector https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

Volgende stappen

In dit document hebt u geleerd hoe u de Apache Kafka Connect-API gebruikt om de IoT Kafka-connector in HDInsight te starten. Gebruik de volgende koppelingen om andere manieren te ontdekken om met Kafka te werken: