Korzystanie z platformy Apache Kafka w usłudze HDInsight z usługą Azure IoT Hub
Dowiedz się, jak używać łącznika usługi Azure IoT Hub platformy Apache Kafka Connect do przenoszenia danych między platformą Apache Kafka w usłudze HDInsight i usłudze Azure IoT Hub. Z tego dokumentu dowiesz się, jak uruchomić łącznik usługi IoT Hub z węzła brzegowego w klastrze.
Interfejs API platformy Kafka Connect umożliwia implementowanie łączników, które stale ściągają dane do platformy Kafka lub wypychają dane z platformy Kafka do innego systemu. Apache Kafka Connect Azure IoT Hub to łącznik, który ściąga dane z usługi Azure IoT Hub do platformy Kafka. Może również wypychać dane z platformy Kafka do usługi IoT Hub.
Podczas ściągania z usługi IoT Hub należy użyć łącznika źródłowego. Podczas wypychania do usługi IoT Hub należy użyć łącznika ujścia. Łącznik usługi IoT Hub udostępnia łączniki źródła i ujścia.
Na poniższym diagramie przedstawiono przepływ danych między usługą Azure IoT Hub i platformą Kafka w usłudze HDInsight podczas korzystania z łącznika.
Aby uzyskać więcej informacji na temat łączenia interfejsu API, zobacz https://kafka.apache.org/documentation/#connect.
Wymagania wstępne
Klaster platformy Apache Kafka w usłudze HDInsight. Aby uzyskać więcej informacji, zobacz dokument Szybki start dotyczący platformy Kafka w usłudze HDInsight.
Węzeł brzegowy w klastrze platformy Kafka. Aby uzyskać więcej informacji, zobacz Dokument Use edge nodes with HDInsight (Używanie węzłów brzegowych z usługą HDInsight ).
Klient SSH. Aby uzyskać więcej informacji, zobacz Łączenie się z usługą HDInsight (Apache Hadoop) przy użyciu protokołu SSH.
Usługa Azure IoT Hub i urządzenie. W tym artykule rozważ użycie symulatora online urządzenia Raspberry Pi do usługi Azure IoT Hub.
Narzędzie do kompilacji Języka Scala.
Tworzenie łącznika
Pobierz źródło łącznika ze https://github.com/Azure/toketi-kafka-connect-iothub/ środowiska lokalnego.
W wierszu polecenia przejdź do
toketi-kafka-connect-iothub-master
katalogu. Następnie użyj następującego polecenia, aby skompilować i spakować projekt:sbt assembly
Ukończenie kompilacji trwa kilka minut. Polecenie tworzy plik o nazwie
kafka-connect-iothub-assembly_2.11-0.7.0.jar
wtoketi-kafka-connect-iothub-master\target\scala-2.11
katalogu dla projektu.
Instalowanie łącznika
Przekaż plik .jar do węzła brzegowego platformy Kafka w klastrze usługi HDInsight. Zmodyfikuj następujące polecenie, zastępując
CLUSTERNAME
element rzeczywistą nazwą klastra. Wartości domyślne dla konta użytkownika SSH i nazwy węzła krawędzi są używane do modyfikowania zgodnie z potrzebami.scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
Po zakończeniu kopiowania pliku połącz się z węzłem krawędzi przy użyciu protokołu SSH:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
Aby zainstalować łącznik w katalogu platformy Kafka
libs
, użyj następującego polecenia:sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
Zachowaj aktywne połączenie SSH, aby wykonać pozostałe kroki.
Konfigurowanie platformy Apache Kafka
Z poziomu połączenia SSH z węzłem krawędzi wykonaj następujące kroki, aby skonfigurować platformę Kafka do uruchamiania łącznika w trybie autonomicznym:
Konfigurowanie zmiennej hasła. Zastąp ciąg PASSWORD hasłem logowania klastra, a następnie wprowadź polecenie:
export password='PASSWORD'
Zainstaluj narzędzie jq. jq ułatwia przetwarzanie dokumentów JSON zwracanych z zapytań ambari. Podaj następujące polecenie:
sudo apt -y install jq
Uzyskaj adres brokerów platformy Kafka. W klastrze może istnieć wiele brokerów, ale wystarczy odwołać się tylko do jednego lub dwóch brokerów. Aby uzyskać adres dwóch hostów brokera, użyj następującego polecenia:
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2` echo $KAFKABROKERS
Skopiuj wartości do późniejszego użycia. Zwrócona wartość będzie podobna do następującego tekstu:
<brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092
Pobierz adres węzłów usługi Apache Zookeeper. W klastrze znajduje się kilka węzłów dozorców, ale wystarczy odwołać się tylko do jednego lub dwóch węzłów. Użyj następującego polecenia, aby zapisać adresy w zmiennej
KAFKAZKHOSTS
:export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
Po uruchomieniu łącznika w trybie
/usr/hdp/current/kafka-broker/config/connect-standalone.properties
autonomicznym plik jest używany do komunikowania się z brokerami platformy Kafka. Aby edytowaćconnect-standalone.properties
plik, użyj następującego polecenia:sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
Wprowadź następujące zmiany:
Bieżąca wartość Nowa wartość Komentarz bootstrap.servers=localhost:9092
Zastąp localhost:9092
wartość hostami brokera z poprzedniego krokuKonfiguruje konfigurację autonomiczną węzła brzegowego w celu znalezienia brokerów platformy Kafka. key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
Ta zmiana umożliwia przetestowanie przy użyciu producenta konsoli dołączonego do platformy Kafka. Mogą być potrzebne różne konwertery dla innych producentów i konsumentów. Aby uzyskać informacje na temat używania innych wartości konwertera, zobacz https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
Takie same jak podane. Nie dotyczy consumer.max.poll.records=10
Dodaj do końca pliku. Ta zmiana polega na zapobieganiu przekroczeniom limitu czasu w łączniku ujścia przez ograniczenie go do 10 rekordów naraz. Aby uzyskać więcej informacji, zobacz https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. Aby zapisać plik, użyj Ctrl + X, Y, a następnie wprowadź.
Aby utworzyć tematy używane przez łącznik, użyj następujących poleceń:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
Aby sprawdzić, czy tematy
iotin
iiotout
istnieją, użyj następującego polecenia:/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
Temat
iotin
służy do odbierania komunikatów z usługi IoT Hub. Tematiotout
służy do wysyłania komunikatów do usługi IoT Hub.
Uzyskiwanie informacji o połączeniu z usługą IoT Hub
Aby pobrać informacje o centrum IoT używane przez łącznik, wykonaj następujące kroki:
Pobierz punkt końcowy zgodny z centrum zdarzeń i nazwę punktu końcowego zgodnego z centrum zdarzeń dla centrum IoT. Aby uzyskać te informacje, użyj jednej z następujących metod:
W witrynie Azure Portal wykonaj następujące czynności:
Przejdź do centrum IoT Hub i wybierz pozycję Punkty końcowe.
W obszarze Wbudowane punkty końcowe wybierz pozycję Zdarzenia.
Z obszaru Właściwości skopiuj wartość następujących pól:
- Nazwa zgodna z centrum zdarzeń
- Punkt końcowy zgodny z centrum zdarzeń
- Partycje
Ważne
Wartość punktu końcowego z portalu może zawierać dodatkowy tekst, który nie jest potrzebny w tym przykładzie. Wyodrębnij tekst pasujący do tego wzorca
sb://<randomnamespace>.servicebus.windows.net/
.
W interfejsie wiersza polecenia platformy Azure użyj następującego polecenia:
az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
Zastąp
myhubname
ciąg nazwą centrum IoT Hub. Odpowiedź jest podobna do następującego tekstu:"EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/", "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e", "Partitions": 2
Uzyskaj zasady dostępu współdzielonego i klucz. W tym przykładzie użyj klucza usługi . Aby uzyskać te informacje, użyj jednej z następujących metod:
W witrynie Azure Portal wykonaj następujące czynności:
- Wybierz pozycję Zasady dostępu współdzielonego, a następnie wybierz pozycję Usługa.
- Skopiuj wartość klucza podstawowego.
- Skopiuj wartość Parametry połączenia — klucz podstawowy.
W interfejsie wiersza polecenia platformy Azure użyj następującego polecenia:
Aby uzyskać wartość klucza podstawowego, użyj następującego polecenia:
az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
Zastąp
myhubname
ciąg nazwą centrum IoT Hub. Odpowiedź jest kluczemservice
podstawowym zasad dla tego centrum.Aby uzyskać parametry połączenia dla
service
zasad, użyj następującego polecenia:az iot hub connection-string show --name myhubname --policy-name service --query "connectionString"
Zastąp
myhubname
ciąg nazwą centrum IoT Hub. Odpowiedź to parametry połączenia zasadservice
.
Konfigurowanie połączenia źródłowego
Aby skonfigurować źródło do pracy z usługą IoT Hub, wykonaj następujące akcje z połączenia SSH z węzłem krawędzi:
Utwórz kopię
connect-iot-source.properties
pliku w/usr/hdp/current/kafka-broker/config/
katalogu. Aby pobrać plik z projektu toketi-kafka-connect-iothub, użyj następującego polecenia:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
Aby edytować
connect-iot-source.properties
plik i dodać informacje o centrum IoT Hub, użyj następującego polecenia:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
W edytorze znajdź i zmień następujące wpisy:
Bieżąca wartość Edytuj Kafka.Topic=PLACEHOLDER
Zamień PLACEHOLDER
naiotin
. Komunikaty odebrane z centrum IoT są umieszczane w temacieiotin
.IotHub.EventHubCompatibleName=PLACEHOLDER
Zastąp PLACEHOLDER
ciąg nazwą zgodną z centrum zdarzeń.IotHub.EventHubCompatibleEndpoint=PLACEHOLDER
Zastąp element PLACEHOLDER
punktem końcowym zgodnym z centrum zdarzeń.IotHub.AccessKeyName=PLACEHOLDER
Zamień PLACEHOLDER
naservice
.IotHub.AccessKeyValue=PLACEHOLDER
Zastąp PLACEHOLDER
element kluczemservice
podstawowym zasad.IotHub.Partitions=PLACEHOLDER
Zastąp PLACEHOLDER
ciąg liczbą partycji z poprzednich kroków.IotHub.StartTime=PLACEHOLDER
Zastąp element PLACEHOLDER
datą UTC. Ta data to godzina rozpoczęcia sprawdzania komunikatów przez łącznik. Format daty toyyyy-mm-ddThh:mm:ssZ
.BatchSize=100
Zamień 100
na5
. Ta zmiana powoduje, że łącznik odczytuje komunikaty na platformie Kafka po utworzeniu pięciu nowych komunikatów w centrum IoT.Aby uzyskać przykładową konfigurację, zobacz Kafka Connect Source Connector for Azure IoT Hub (Łącznik źródła programu Kafka Connect dla usługi Azure IoT Hub).
Aby zapisać zmiany, użyj Ctrl + X, Y, a następnie wprowadź.
Aby uzyskać więcej informacji na temat konfigurowania źródła łącznika, zobacz https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md.
Konfigurowanie połączenia ujścia
Aby skonfigurować połączenie ujścia do pracy z usługą IoT Hub, wykonaj następujące akcje z połączenia SSH z węzłem krawędzi:
Utwórz kopię
connect-iothub-sink.properties
pliku w/usr/hdp/current/kafka-broker/config/
katalogu. Aby pobrać plik z projektu toketi-kafka-connect-iothub, użyj następującego polecenia:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
Aby edytować
connect-iothub-sink.properties
plik i dodać informacje o centrum IoT Hub, użyj następującego polecenia:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
W edytorze znajdź i zmień następujące wpisy:
Bieżąca wartość Edytuj topics=PLACEHOLDER
Zamień PLACEHOLDER
naiotout
. Komunikaty zapisywane wiotout
temacie są przekazywane do centrum IoT Hub.IotHub.ConnectionString=PLACEHOLDER
Zastąp PLACEHOLDER
element parametry połączenia zasadservice
.Aby uzyskać przykładową konfigurację, zobacz Kafka Connect Sink Connector for Azure IoT Hub (Łącznik ujścia programu Kafka Connect dla usługi Azure IoT Hub).
Aby zapisać zmiany, użyj Ctrl + X, Y, a następnie wprowadź.
Aby uzyskać więcej informacji na temat konfigurowania ujścia łącznika, zobacz https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Uruchamianie łącznika źródłowego
Aby uruchomić łącznik źródłowy, użyj następującego polecenia z połączenia SSH z węzłem krawędzi:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
Po uruchomieniu łącznika wyślij komunikaty do centrum IoT Hub z urządzeń. Gdy łącznik odczytuje komunikaty z centrum IoT i przechowuje je w temacie platformy Kafka, rejestruje informacje w konsoli:
[2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39) [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
Uwaga
Podczas uruchamiania łącznika może zostać wyświetlonych kilka ostrzeżeń. Te ostrzeżenia nie powodują problemów z odbieraniem komunikatów z centrum IoT Hub.
Zatrzymaj łącznik po kilku minutach za pomocą Ctrl + C dwa razy. Zatrzymanie łącznika trwa kilka minut.
Uruchamianie łącznika ujścia
Z poziomu połączenia SSH z węzłem krawędzi użyj następującego polecenia, aby uruchomić łącznik ujścia w trybie autonomicznym:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
W miarę uruchamiania łącznika wyświetlane są informacje podobne do następującego tekstu:
[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
Uwaga
Podczas uruchamiania łącznika można zauważyć kilka ostrzeżeń. Możesz je bezpiecznie zignorować.
Wysyłanie komunikatów
Aby wysyłać komunikaty za pośrednictwem łącznika, wykonaj następujące czynności:
Otwórz drugą sesję SSH w klastrze platformy Kafka:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
Uzyskaj adres brokerów platformy Kafka dla nowej sesji SSH. Zastąp ciąg PASSWORD hasłem logowania klastra, a następnie wprowadź polecenie:
export password='PASSWORD' export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
Aby wysłać komunikaty do tematu
iotout
, użyj następującego polecenia:/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
To polecenie nie zwraca normalnego wiersza polecenia powłoki Bash. Zamiast tego wysyła dane wejściowe klawiatury do tematu
iotout
.Aby wysłać komunikat do urządzenia, wklej dokument JSON do sesji SSH dla elementu
kafka-console-producer
.Ważne
Musisz ustawić wartość
"deviceId"
wpisu na identyfikator urządzenia. W poniższym przykładzie urządzenie ma nazwęmyDeviceId
:{"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
Schemat dla tego dokumentu JSON został opisany bardziej szczegółowo w temacie https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Jeśli używasz symulowanego urządzenia Raspberry Pi i jest ono uruchomione, urządzenie rejestruje następujący komunikat.
Receive message: Turn On
Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.
Aby uzyskać więcej informacji na temat korzystania z łącznika ujścia, zobacz https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Następne kroki
W tym dokumencie przedstawiono sposób uruchamiania łącznika IoT Kafka Connector w usłudze HDInsight przy użyciu interfejsu API programu Apache Kafka Connect. Skorzystaj z poniższych linków, aby odnaleźć inne sposoby pracy z platformą Kafka: