Verwendung von Apache Kafka in HDInsight mit Azure IoT Hub
Hier erfahren Sie, wie Sie mithilfe des Apache Kafka Connect-Azure IoT Hub-Connectors Daten zwischen Apache Kafka in HDInsight und Azure IoT Hub verschieben. In diesem Dokument erfahren Sie, wie der IoT Hub-Connector über einen Edgeknoten im Cluster ausgeführt wird.
Mit der Kafka Connect-API können Sie Connectors implementieren, die kontinuierlich Daten in Kafka abrufen oder Daten aus Kafka an ein anderes System pushen. Apache Kafka Connect Azure IoT Hub ist ein Connector, der Daten aus Azure IoT Hub in Kafka pullt. Dieser kann Daten auch über Kafka mithilfe von Push an IoT Hub übertragen.
Für den Abruf von Daten aus IoT Hub verwenden Sie einen Quellconnector. Für die Übertragung von Daten mithilfe von Push an IoT Hub verwenden Sie einen Senkenconnector. Der IoT Hub-Connector stellt sowohl Quell- als auch Senkenconnector bereit.
In der folgenden Abbildung wird der Datenfluss zwischen Azure IoT Hub und Kafka in HDInsight unter Verwendung des Connectors gezeigt.
Weitere Informationen zur Connect-API finden Sie unter https://kafka.apache.org/documentation/#connect.
Voraussetzungen
Ein Apache Kafka-Cluster in HDInsight. Weitere Informationen finden Sie im Schnellstart: Kafka in HDInsight.
Ein Edgeknoten im Kafka-Cluster. Weitere Informationen finden Sie im Dokument Verwenden von Edgeknoten in HDInsight.
Einen SSH-Client. Weitere Informationen finden Sie unter Herstellen einer Verbindung mit HDInsight (Hadoop) per SSH.
Eine Azure IoT Hub-Instanz und ein Gerät. In Rahmen dieses Artikels sollten Sie das Verbinden des Raspberry Pi-Onlinesimulators mit Azure IoT Hub in Betracht ziehen.
Erstellen des Connectors
Laden Sie die Quelle für den Connector unter https://github.com/Azure/toketi-kafka-connect-iothub/ in Ihre lokale Umgebung herunter.
Navigieren Sie in einer Eingabeaufforderung zum Verzeichnis
toketi-kafka-connect-iothub-master
. Erstellen und packen Sie das Projekt dann mit dem folgenden Befehl:sbt assembly
Die Erstellung kann einige Minuten dauern. Mit diesem Befehl wird die Datei
kafka-connect-iothub-assembly_2.11-0.7.0.jar
im Zielverzeichnistoketi-kafka-connect-iothub-master\target\scala-2.11
für das Projekt erstellt.
Installieren des Connectors
Laden Sie die JAR-Datei auf den Edgeknoten Ihres Kafka in HDInsight-Clusters hoch. Bearbeiten Sie den folgenden Befehl, indem Sie
CLUSTERNAME
durch den tatsächlichen Namen Ihres Clusters ersetzen. Die Standardwerte für das SSH-Benutzerkonto und der Name des Edgeknotens werden verwendet und nach Bedarf geändert.scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
Stellen Sie nach dem Dateikopiervorgang über SSH eine Verbindung mit dem Edgeknoten her:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
Verwenden Sie für die Installation des Connectors im Kafka-Verzeichnis
libs
den folgenden Befehl:sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
Halten Sie die SSH-Verbindung für die restlichen Schritte aufrecht.
Konfigurieren von Apache Kafka
Führen Sie über Ihre SSH-Verbindung mit dem Edgeknoten die folgenden Schritte durch, um Kafka für die Ausführung des Connectors im eigenständigen Modus zu konfigurieren:
Richten Sie eine Kennwortvariable ein. Ersetzen Sie PASSWORD durch das Kennwort für die Clusteranmeldung, und geben Sie dann den folgenden Befehl ein:
export password='PASSWORD'
Installieren Sie das Hilfsprogramm jq. Durch jq können die von Ambari-Abfragen zurückgegebenen JSON-Dokumente einfacher verarbeitet werden. Geben Sie folgenden Befehl ein:
sudo apt -y install jq
Rufen Sie die Adresse der Kafka-Broker ab. Auch wenn in Ihrem Cluster möglicherweise viele Broker vorhanden sind, müssen Sie nur auf einen oder zwei verweisen. Um die Adresse der zwei Brokerhosts zu ermitteln, verwenden Sie den folgenden Befehl:
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2` echo $KAFKABROKERS
Kopieren Sie diese Werte zur späteren Verwendung. Der zurückgegebene Wert ähnelt dem folgenden Text:
<brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092
Rufen Sie die Adresse der Apache ZooKeeper-Knoten ab. Auch wenn in Ihrem Cluster möglicherweise mehrere ZooKeeper-Knoten vorhanden sind, müssen Sie nur auf einen oder zwei verweisen. Verwenden Sie den folgenden Befehl, um die Adressen in der Variable
KAFKAZKHOSTS
zu speichern:export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
Bei der Ausführung des Connectors im eigenständigen Modus wird für die Kommunikation mit den Kafka-Brokern die Datei
/usr/hdp/current/kafka-broker/config/connect-standalone.properties
verwendet. Verwenden Sie zum Bearbeiten der Dateiconnect-standalone.properties
den folgenden Befehl:sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
Nehmen Sie die folgenden Bearbeitungen vor:
Aktueller Wert Neuer Wert Comment bootstrap.servers=localhost:9092
Ersetzen Sie den Wert localhost:9092
durch die Brokerhosts aus dem vorherigen Schritt.Konfiguriert die eigenständige Konfiguration für den Edgeknoten zum Suchen der Kafka-Broker. key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
Durch diese Änderung können Sie Tests mit dem Konsolenproducer, der bei Kafka enthalten ist, durchführen. Eventuell benötigen Sie für andere Producer und Consumer unterschiedliche Konverter. Informationen zur Verwendung anderer Konverterwerte finden Sie unter https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
Identisch mit der Angabe. N/V consumer.max.poll.records=10
Am Ende der Datei hinzufügen. Durch diese Änderung soll Timeouts im Senkenconnector verhindert werden, indem dieser auf jeweils 10 Datensätze begrenzt wird. Weitere Informationen finden Sie unter https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. Drücken Sie zum Speichern der Datei STRG+X, Y und dann die EINGABETASTE.
Um die vom Connector verwendeten Themen zu erstellen, verwenden Sie die folgenden Befehle:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
Um zu bestätigen, dass die Themen
iotin
undiotout
erstellt wurden, verwenden Sie den folgenden Befehl:/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
Das Thema
iotin
dient zum Empfangen von Nachrichten von IoT Hub. Das Themaiotout
dient zum Senden von Nachrichten an IoT Hub.
Abrufen der IoT Hub-Verbindungsinformationen
Führen Sie zum Abrufen von IoT Hub-Informationen, die vom Connector verwendet werden, die folgenden Schritte durch:
Rufen Sie den Event Hub-kompatiblen Endpunkt und den zugehörigen Endpunktnamen für Ihren IoT Hub ab. Sie können diese Informationen mit einer der folgenden Methoden ermitteln:
Führen Sie die folgenden Schritte über das Azure-Portal aus:
Navigieren Sie zu Ihrer IoT Hub-Instanz, und klicken Sie auf Endpunkte.
Wählen Sie unter Integrierte Endpunkte die Option Ereignisse aus.
Kopieren Sie unter Eigenschaften den Wert der folgenden Felder:
- Event Hub-kompatibler Name
- Event Hub-kompatibler Endpunkt
- Partitionen
Wichtig
Der Endpunktwert aus dem Portal enthält möglicherweise zusätzlichen Text, der in diesem Beispiel nicht benötigt wird. Extrahieren Sie den Text, der dem Muster
sb://<randomnamespace>.servicebus.windows.net/
entspricht.
Geben Sie über die Azure CLI den folgenden Befehl ein:
az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
Ersetzen Sie
myhubname
durch den Namen Ihrer IoT Hub-Instanz. Die Antwort ähnelt dem folgenden Text:"EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/", "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e", "Partitions": 2
Rufen Sie die SAS-Richtlinie und den Schlüssel ab. In diesem Beispiel wird der Dienstschlüssel verwendet. Sie können diese Informationen mit einer der folgenden Methoden ermitteln:
Führen Sie die folgenden Schritte über das Azure-Portal aus:
- Wählen Sie SAS-Richtlinien aus, und klicken Sie dann auf Dienst.
- Kopieren Sie den Wert unter Primärschlüssel.
- Kopieren Sie den Wert von Verbindungszeichenfolge – Primärschlüssel.
Geben Sie über die Azure CLI den folgenden Befehl ein:
Um den Wert des Primärschlüssels zu ermitteln, verwenden Sie den folgenden Befehl:
az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
Ersetzen Sie
myhubname
durch den Namen Ihrer IoT Hub-Instanz. Die Antwort ist der Primärschlüssel für die Richtlinieservice
für diesen Hub.Verwenden Sie zum Abrufen der Verbindungszeichenfolge für die Richtlinie
service
den folgenden Befehl:az iot hub connection-string show --name myhubname --policy-name service --query "connectionString"
Ersetzen Sie
myhubname
durch den Namen Ihrer IoT Hub-Instanz. Die Antwort ist die Verbindungszeichenfolge für die Richtlinieservice
.
Konfigurieren der Quellverbindung
Um die Quelle für die Arbeit mit Ihrer IoT Hub-Instanz zu konfigurieren, führen Sie die folgenden Aktionen über eine SSH-Verbindung mit dem Edgeknoten aus:
Erstellen Sie eine Kopie von der Datei
connect-iot-source.properties
im Verzeichnis/usr/hdp/current/kafka-broker/config/
. Um die Datei aus dem Projekt „toketi-kafka-connect-iothub“ herunterzuladen, verwenden Sie den folgenden Befehl:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
Verwenden Sie zum Bearbeiten der Datei
connect-iot-source.properties
und Hinzufügen der IoT Hub-Informationen den folgenden Befehl:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
Suchen Sie im Editor nach den folgenden Einträgen, und ändern Sie sie:
Aktueller Wert Edit (Bearbeiten) Kafka.Topic=PLACEHOLDER
Ersetzen Sie PLACEHOLDER
durchiotin
. Die vom IoT Hub empfangenen Nachrichten werden in das Themaiotin
platziert.IotHub.EventHubCompatibleName=PLACEHOLDER
Ersetzen Sie PLACEHOLDER
durch den Event Hub-kompatiblen Namen.IotHub.EventHubCompatibleEndpoint=PLACEHOLDER
Ersetzen Sie PLACEHOLDER
durch den Event Hub-kompatiblen Endpunkt.IotHub.AccessKeyName=PLACEHOLDER
Ersetzen Sie PLACEHOLDER
durchservice
.IotHub.AccessKeyValue=PLACEHOLDER
Ersetzen Sie PLACEHOLDER
durch den Primärschlüssel der Richtlinieservice
.IotHub.Partitions=PLACEHOLDER
Ersetzen Sie PLACEHOLDER
durch die Anzahl von Partitionen aus den vorherigen Schritten.IotHub.StartTime=PLACEHOLDER
Ersetzen Sie PLACEHOLDER
durch ein Datum im UTC-Format. Dieses Datum bezieht sich auf den Zeitpunkt, an dem der Connector mit der Prüfung auf Nachrichten begonnen hat. Das Datumsformat istyyyy-mm-ddThh:mm:ssZ
.BatchSize=100
Ersetzen Sie 100
durch5
. Diese Änderung bewirkt, dass der Connector Nachrichten in Kafka liest, sobald fünf neue Nachrichten in IoT Hub vorhanden sind.Eine Beispielkonfiguration finden Sie unter Kafka Connect Source Connector for Azure IoT Hub (Kafka Connect-Quellconnector für Azure IoT Hub).
Drücken Sie zum Speichern der Änderungen STRG+X, Y und dann die EINGABETASTE.
Weitere Informationen zum Konfigurieren der Connectorquelle finden Sie unter https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md.
Konfigurieren der Senkenverbindung
Um die Senkenverbindung für die Arbeit mit Ihrer IoT Hub-Instanz zu konfigurieren, führen Sie die folgenden Aktionen über eine SSH-Verbindung mit dem Edgeknoten aus:
Erstellen Sie eine Kopie von der Datei
connect-iothub-sink.properties
im Verzeichnis/usr/hdp/current/kafka-broker/config/
. Um die Datei aus dem Projekt „toketi-kafka-connect-iothub“ herunterzuladen, verwenden Sie den folgenden Befehl:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
Verwenden Sie zum Bearbeiten der Datei
connect-iothub-sink.properties
und Hinzufügen der IoT Hub-Informationen den folgenden Befehl:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
Suchen Sie im Editor nach den folgenden Einträgen, und ändern Sie sie:
Aktueller Wert Edit (Bearbeiten) topics=PLACEHOLDER
Ersetzen Sie PLACEHOLDER
durchiotout
. Nachrichten, die in das Themaiotout
geschrieben wurden, werden an IoT Hub weitergeleitet.IotHub.ConnectionString=PLACEHOLDER
Ersetzen Sie PLACEHOLDER
durch die Verbindungszeichenfolge für die Richtlinieservice
.Eine Beispielkonfiguration finden Sie unter Kafka Connect Sink Connector for Azure IoT Hub (Kafka Connect-Senkenconnector für Azure IoT Hub).
Drücken Sie zum Speichern der Änderungen STRG+X, Y und dann die EINGABETASTE.
Weitere Informationen zum Konfigurieren der Connectorsenke finden Sie unter https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Starten des Quellconnectors
Geben Sie zum Starten des Quellconnectors den folgenden Befehl über eine SSH-Verbindung mit dem Edgeknoten ein:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
Sobald der Connector gestartet wurde, senden Sie Nachrichten an IoT Hub von Ihren Geräten. Während der Connector Nachrichten von IoT Hub liest und im Kafka-Thema speichert, protokolliert er Informationen in der Konsole:
[2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39) [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
Hinweis
Beim Starten des Connectors werden möglicherweise mehrere Warnungen angezeigt. Diese Warnungen führen nicht zu Problemen mit dem Empfang von Nachrichten von IoT Hub.
Brechen Sie den Connector nach einigen Minuten ab, indem Sie zweimal STRG+C drücken. Das Anhalten des Connectors dauert einige Minuten.
Starten des Senkenconnectors
Geben Sie über eine SSH-Verbindung mit dem Edgeknoten den folgenden Befehl ein, um den Senkenconnector zu starten:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
Bei der Ausführung des Connectors werden Informationen ähnlich dem folgenden Text angezeigt:
[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
Hinweis
Beim Starten des Connectors werden möglicherweise mehrere Warnungen angezeigt. Sie können dies ignorieren.
Senden von Nachrichten
Führen Sie zum Senden von Nachrichten über den Connector folgende Schritte durch:
Öffnen Sie eine zweite SSH-Sitzung mit dem Kafka-Cluster:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
Rufen Sie die Adresse der Kafka-Broker für die neue SSH-Sitzung ab. Ersetzen Sie PASSWORD durch das Kennwort für die Clusteranmeldung, und geben Sie dann den folgenden Befehl ein:
export password='PASSWORD' export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
Um Nachrichten an das Thema
iotout
zu senden, verwenden Sie folgenden Befehl:/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
Bei diesem Befehl werden Sie nicht zur gewohnten Bash-Eingabeaufforderung weitergeleitet. Stattdessen sendet diese Tastatureingaben an das Thema
iotout
.Fügen Sie zum Senden einer Nachricht an Ihr Gerät ein JSON-Dokument in der SSH-Sitzung für
kafka-console-producer
ein.Wichtig
Sie müssen den Wert des Eintrags
"deviceId"
auf die ID Ihres Geräts festlegen. Im folgenden Beispiel ist das Gerät mit dem NamenmyDeviceId
versehen:{"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
Das Schema für dieses JSON-Dokument wird unter https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md ausführlicher beschrieben.
Wenn Sie das simulierte Raspberry Pi-Gerät verwenden und dieses ausgeführt wird, protokolliert das Gerät die folgende Nachricht.
Receive message: Turn On
Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.
Weitere Informationen zur Verwendung des Senkenconnectors finden Sie unter https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Nächste Schritte
In diesem Dokument haben Sie gelernt, wie Sie mithilfe der Apache Kafka Connect-API den IoT Kafka-Connector in HDInsight starten. Verwenden Sie die folgenden Links, um weitere Möglichkeiten zur Arbeit mit Kafka kennenzulernen: