Usare Apache Kafka in HDInsight con l'hub IoT
Informazioni su come usare il connettore di Apache Kafka Connect Azure IoT Hub per spostare dati tra Apache Kafka in HDInsight e l'hub IoT di Azure. In questo documento sono incluse informazioni su come eseguire il connettore dell'hub IoT da un nodo perimetrale del cluster.
L'API Kafka Connect consente di implementare connettori che estraggono continuamente i dati in Kafka o eseguono il push di dati da Kafka a un altro sistema. Apache Kafka Connect Azure IoT Hub è un connettore che esegue il pull dei dati dall'hub IoT di Azure a Kafka. Può anche eseguire il push dei dati da Kafka all'hub IoT.
Quando si esegue il pull dall'hub IoT, si usa un connettore di origine. Quando si esegue il push all'hub IoT, si usa un connettore sink. Il connettore hub IoT offre sia il connettore di origine che il connettore sink.
Nel diagramma seguente viene illustrato il flusso di dati tra l'hub IoT e Kafka in HDInsight quando si usa il connettore.
Per altre informazioni su come connettere l'API, vedere https://kafka.apache.org/documentation/#connect.
Prerequisiti
Un cluster Apache Kafka in HDInsight. Per altre informazioni, vedere il documento di avvio rapido di Kafka in HDInsight.
Un nodo perimetrale nel cluster Kafka. Per altre informazioni, vedere Usare nodi perimetrali con il documento HDInsight .
Un client SSH. Per altre informazioni, vedere Connettersi a HDInsight (Apache Hadoop) con SSH.
Un hub IoT di Azure e un dispositivo. Per questo articolo, è consigliabile usare il simulatore online Connect Raspberry Pi per hub IoT di Azure.
Strumento di compilazione Scala.
Compilare il connettore
Scaricare l'origine per il connettore da https://github.com/Azure/toketi-kafka-connect-iothub/ all'ambiente locale.
Da un prompt dei comandi passare alla
toketi-kafka-connect-iothub-master
directory. Usare quindi il comando seguente per compilare e creare il pacchetto del progetto:sbt assembly
Il completamento della compilazione richiede alcuni minuti. Il comando crea un file denominato
kafka-connect-iothub-assembly_2.11-0.7.0.jar
nellatoketi-kafka-connect-iothub-master\target\scala-2.11
directory per il progetto.
Installare il connettore
Caricare il file .jar nel nodo perimetrale del cluster Kafka in HDInsight. Modificare il comando seguente sostituendo
CLUSTERNAME
con il nome effettivo del cluster. I valori predefiniti per l'account utente SSH e il nome del nodo perimetrale vengono usati per modificare in base alle esigenze.scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
Una volta completata la copia del file, connettersi al nodo perimetrale mediante SSH:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
Per installare il connettore nella directory
libs
di Kafka, usare il comando seguente:sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
Mantenere attiva la connessione SSH per i passaggi rimanenti.
Configurare Apache Kafka
Dalla connessione SSH al nodo perimetrale, seguire questa procedura per configurare Kafka per eseguire il connettore in modalità autonoma:
Configurare la variabile di password. Sostituire PASSWORD con la password di accesso del cluster, quindi immettere il comando :
export password='PASSWORD'
Installare l'utilità jq . jq semplifica l'elaborazione dei documenti JSON restituiti dalle query Ambari. Immettere il comando seguente:
sudo apt -y install jq
Ottenere l'indirizzo dei broker Kafka. Nel cluster possono esserci molti broker, ma è sufficiente fare riferimento a uno o due di essi. Per ottenere l'indirizzo di due host di broker, usare il comando seguente:
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2` echo $KAFKABROKERS
Copiare i valori per usarli in un secondo momento. Il valore restituito è simile al testo seguente:
<brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092
Ottenere l'indirizzo dei nodi Apache ZooKeeper. Esistono diversi nodi Zookeeper nel cluster, ma è sufficiente fare riferimento a uno o due di essi. Usare il comando seguente per archiviare gli indirizzi nella variabile
KAFKAZKHOSTS
:export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
Quando si esegue il connettore in modalità autonoma, il
/usr/hdp/current/kafka-broker/config/connect-standalone.properties
file viene usato per comunicare con i broker Kafka. Per modificare il fileconnect-standalone.properties
, usare il comando seguente:sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
Apportare le modifiche seguenti:
Valore corrente Nuovo valore Commento bootstrap.servers=localhost:9092
Sostituire il localhost:9092
valore con gli host broker del passaggio precedenteConfigura la configurazione autonoma per il nodo perimetrale per trovare i broker Kafka. key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
La modifica consente di eseguire test usando il producer di console incluso in Kafka. Potrebbero essere necessari diversi convertitori per altri producer e consumer. Per informazioni sull'uso di altri valori del convertitore, vedere https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
Uguale a quanto specificato. N/D consumer.max.poll.records=10
Aggiungi alla fine del file. Questa modifica è finalizzata a evitare i timeout nel connettore sink mediante l'impostazione di un limite di 10 record alla volta. Per ulteriori informazioni, vedere https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. Per salvare il file, usare Ctrl + X, Y e INVIO.
Per creare gli argomenti usati dal connettore, usare i comandi seguenti:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
Per verificare che gli argomenti
iotin
eiotout
esistano, usare il comando seguente:/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
L'argomento
iotin
viene usato per ricevere messaggi dall'hub IoT. L'argomentoiotout
viene usato per inviare messaggi all'hub IoT.
Ottenere informazioni di connessione dell'hub IoT
Per recuperare informazioni sull'hub IoT usato dal connettore, attenersi alla procedura seguente:
Ottenere l'endpoint compatibile con hub eventi e il nome dell'endpoint compatibile con hub eventi per l'hub IoT. Per ottenere tali informazioni, usare uno dei metodi seguenti:
Dal portale di Azure, attenersi alla procedura seguente:
Passare all'hub IoT e selezionare Endpoint.
Da Endpoint predefiniti, selezionare Eventi.
Da Proprietà, copiare il valore dei campi seguenti:
- Nome compatibile con l'hub eventi
- Endpoint compatibile con l'hub eventi
- Partitions
Importante
Il valore dell'endpoint del portale può contenere testo aggiuntivo che non è necessario in questo esempio. Estrarre il testo che corrisponde al criterio
sb://<randomnamespace>.servicebus.windows.net/
.
Dall'interfaccia della riga di comando di Azure, usare il comando seguente:
az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
Sostituire
myhubname
con il nome dell'hub IoT. La risposta restituita è simile al testo seguente:"EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/", "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e", "Partitions": 2
Ottenere i criteri di accesso condiviso e la chiave. Per questo esempio, usare la chiave service. Per ottenere tali informazioni, usare uno dei metodi seguenti:
Dal portale di Azure, attenersi alla procedura seguente:
- Selezionare Criteri di accesso condiviso e quindi service.
- Copiare il valore di Chiave primaria.
- Copiare il valore di Stringa di connessione - chiave primaria.
Dall'interfaccia della riga di comando di Azure, usare il comando seguente:
Per ottenere il valore della chiave primaria, usare il comando seguente:
az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
Sostituire
myhubname
con il nome dell'hub IoT. La risposta è la chiave primaria dei criteriservice
per l'hub.Per ottenere la stringa di connessione per i criteri
service
, usare il comando seguente:az iot hub connection-string show --name myhubname --policy-name service --query "connectionString"
Sostituire
myhubname
con il nome dell'hub IoT. La risposta è la stringa di connessione per i criteriservice
.
Configurare la connessione di origine
Per configurare l'origine per l'hub IoT, eseguire le azioni seguenti da una connessione SSH al nodo perimetrale:
Creare una copia del file
connect-iot-source.properties
nella directory/usr/hdp/current/kafka-broker/config/
. Per scaricare il file dal progetto toketi-kafka-connect-iothub, usare il comando seguente:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
Per modificare il file
connect-iot-source.properties
e aggiungere le informazioni dell'hub IoT, usare il comando seguente:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
Nell'editor individuare e modificare le voci seguenti:
Valore corrente Modifica Kafka.Topic=PLACEHOLDER
Sostituisci PLACEHOLDER
coniotin
. I messaggi ricevuti dall'hub IoT vengono salvati nell'argomentoiotin
.IotHub.EventHubCompatibleName=PLACEHOLDER
sostituire PLACEHOLDER
con il nome compatibile con l'hub eventi.IotHub.EventHubCompatibleEndpoint=PLACEHOLDER
sostituire PLACEHOLDER
con l'endpoint compatibile con l'hub eventi.IotHub.AccessKeyName=PLACEHOLDER
Sostituisci PLACEHOLDER
conservice
.IotHub.AccessKeyValue=PLACEHOLDER
sostituire PLACEHOLDER
con la chiave primaria dei criteriservice
.IotHub.Partitions=PLACEHOLDER
sostituire PLACEHOLDER
con il numero di partizioni dei passaggi precedenti.IotHub.StartTime=PLACEHOLDER
sostituire PLACEHOLDER
con una data UTC. Tale data indica il momento in cui il connettore inizia a controllare i messaggi. Il formato della data èyyyy-mm-ddThh:mm:ssZ
.BatchSize=100
Sostituisci 100
con5
. Questa modifica fa sì che il connettore legga i messaggi in Kafka quando sono presenti cinque nuovi messaggi nell'hub IoT.Per un esempio di configurazione, vedere Connettore di origine Kafka Connect per hub IoT di Azure.
Per salvare le modifiche, usare CTRL + X, Y e quindi INVIO.
Per altre informazioni sulla configurazione dell'origine del connettore, vedere https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md.
Configurare la connessione sink
Per configurare la connessione sink per l'hub IoT, eseguire le azioni seguenti da una connessione SSH al nodo perimetrale:
Creare una copia del file
connect-iothub-sink.properties
nella directory/usr/hdp/current/kafka-broker/config/
. Per scaricare il file dal progetto toketi-kafka-connect-iothub, usare il comando seguente:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
Per modificare il file
connect-iothub-sink.properties
e aggiungere le informazioni dell'hub IoT, usare il comando seguente:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
Nell'editor individuare e modificare le voci seguenti:
Valore corrente Modifica topics=PLACEHOLDER
Sostituisci PLACEHOLDER
coniotout
. I messaggi scritti nell'argomentoiotout
vengono inoltrati all'hub IoT.IotHub.ConnectionString=PLACEHOLDER
sostituire PLACEHOLDER
con la stringa di connessione per i criteriservice
.Per un esempio di configurazione, vedere Kafka Connect Sink Connector per hub IoT di Azure.
Per salvare le modifiche, usare CTRL + X, Y e quindi INVIO.
Per altre informazioni sulla configurazione del sink del connettore, vedere https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Avviare il connettore di origine
Per avviare il connettore di origine, usare il comando seguente da una connessione SSH al nodo perimetrale:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
Una volta avviato il connettore, inviare messaggi all'hub IoT dal proprio dispositivo. Quando il connettore legge i messaggi dall'hub IoT e li archivia nell'argomento di Kafka, registra informazioni nella console:
[2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39) [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
Nota
All'avvio del connettore è possibile che vengano visualizzati diversi avvisi. Tali avvisi non causano problemi alla ricezione dei messaggi dall'hub IoT.
Arrestare il connettore dopo alcuni minuti usando CTRL+C due volte. L'arresto del connettore richiede alcuni minuti.
Avviare il connettore sink
Per avviare il connettore sink in modalità autonoma, usare il comando seguente da una connessione SSH al nodo perimetrale:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
Quando viene avviata l'esecuzione del connettore, vengono visualizzate informazioni simili al testo seguente:
[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
Nota
All'avvio del connettore è possibile che vengano visualizzati diversi avvisi. È possibile ignorare questi avvisi.
Inviare messaggi
Per inviare messaggi tramite il connettore, attenersi alla procedura seguente:
Aprire una seconda sessione SSH per il cluster Kafka:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
Ottenere l'indirizzo dei broker Kafka per la nuova sessione SSH. Sostituire PASSWORD con la password di accesso del cluster, quindi immettere il comando :
export password='PASSWORD' export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
Per inviare messaggi all'argomento
iotout
, usare il comando seguente:/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
Questo comando non restituisce il normale prompt di Bash. Invece, invia input da tastiera all'argomento
iotout
.Per inviare un messaggio al dispositivo, incollare un documento con estensione json nella sessione SSH per
kafka-console-producer
.Importante
È necessario impostare il valore della voce
"deviceId"
sull'ID del dispositivo. Nell'esempio seguente il dispositivo è denominatomyDeviceId
:{"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
Lo schema per il documento con estensione json è descritto in modo più dettagliato all'indirizzo https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Se si usa il dispositivo Raspberry Pi simulato ed è in esecuzione, il dispositivo registra il messaggio seguente.
Receive message: Turn On
Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.
Per altre informazioni sull'uso del connettore sink, vedere https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Passaggi successivi
In questo documento è stato descritto come usare l'API Apache Kafka Connect per avviare il connettore IoT per Kafka in HDInsight. Per trovare altri modi per lavorare con Kafka, vedere i collegamenti seguenti: