Uso de Apache Kafka en HDInsight con Azure IoT Hub
Aprenda a usar el conector Apache Kafka Connect Azure IoT Hub para mover datos entre Apache Kafka de HDInsight y Azure IoT Hub. En este documento, aprenderá a ejecutar el conector de IoT Hub desde un nodo perimetral del clúster.
La API de Kafka Connect le permite implementar conectores que insertan datos continuamente en Kafka o extraen datos de Kafka en otro sistema. Apache Kafka Connect Azure IoT Hub es un conector que extrae datos de Azure IoT Hub y los inserta en Kafka. También se pueden insertar datos de Kafka en IoT Hub.
Al extraer datos de IoT Hub, usará un conector de origen. Al insertar en Azure IoT, usará un conector de receptor. El conector de IoT Hub proporciona los conectores de origen y de receptor.
En el siguiente diagrama se muestra el flujo de datos entre Azure IoT Hub y Kafka en HDInsight al usar el conector.
Para obtener más información sobre las API de Connect, consulte https://kafka.apache.org/documentation/#connect.
Requisitos previos
Un clúster de Apache Kafka en HDInsight. Para obtener más información, consulte el documento Inicio rápido de Kafka en HDInsight.
Un nodo perimetral del clúster de Kafka. Para obtener más información, consulte el documento Uso de nodos perimetrales con HDInsight.
Un cliente SSH. Para más información, consulte Conexión a través de SSH con HDInsight (Apache Hadoop).
Una instancia de Azure IoT Hub y un dispositivo. Para este artículo, se recomienda el documento Conexión del simulador en línea Raspberry Pi a Azure IoT Hub.
Creación del conector
Descargue el código fuente del conector desde https://github.com/Azure/toketi-kafka-connect-iothub/ al entorno local.
En un símbolo del sistema, vaya al directorio
toketi-kafka-connect-iothub-master
. Para compilar y empaquetar el proyecto, use el siguiente comando:sbt assembly
La compilación tarda unos minutos en completarse. El comando crea un archivo denominado
kafka-connect-iothub-assembly_2.11-0.7.0.jar
en el directoriotoketi-kafka-connect-iothub-master\target\scala-2.11
para el proyecto.
Instalación del conector
Cargue el archivo .jar en el nodo perimetral del clúster de Kafka en HDInsight. Para modificar el comando siguiente, reemplace
CLUSTERNAME
por el nombre real del clúster. Los valores predeterminados de la cuenta de usuario SSH y el nombre del nodo perimetral se usan para modificar según sea necesario.scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
Una vez completada la copia del archivo, conéctese al nodo perimetral mediante SSH:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
Para instalar el conector en el directorio
libs
de Kafka, use el comando siguiente:sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
Mantenga la conexión SSH activa durante el resto de pasos.
Configuración de Apache Kafka
Desde la conexión SSH en el nodo perimetral, siga estos pasos para configurar Kafka para que ejecute el conector en modo independiente:
Configure una variable de contraseña. Reemplace PASSWORD por la contraseña de inicio de sesión del clúster y, después, escriba el comando:
export password='PASSWORD'
Instale la utilidad jq. Jq facilita el procesamiento de documentos JSON devueltos por consultas de Ambari. Escriba el comando siguiente:
sudo apt -y install jq
Obtenga la dirección de los agentes de Kafka. Puede haber muchos agentes en el clúster, pero solo debe hacer referencia a uno o dos. Para obtener la dirección de los dos hosts de agente, use el comando siguiente:
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2` echo $KAFKABROKERS
Copie estos valores para más adelante. El valor que se devuelve es similar al texto siguiente:
<brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092
Obtenga la dirección de los nodos de Apache Zookeeper. Hay varios nodos de Zookeeper en el clúster, pero basta con hacer referencia a uno o dos. Use el siguiente comando para almacenar las direcciones en la variable
KAFKAZKHOSTS
:export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
Al ejecutar el conector en modo independiente, se usa el archivo
/usr/hdp/current/kafka-broker/config/connect-standalone.properties
para comunicarse con los agentes de Kafka. Para editar el archivoconnect-standalone.properties
, use el comando siguiente:sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
Haga los siguientes cambios:
Valor actual Valor nuevo Comentario bootstrap.servers=localhost:9092
Reemplace el valor localhost:9092
por los hosts de agente del paso anterior.Realice la configuración independiente del nodo perimetral para encontrar los agentes de Kafka. key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
Este cambio permite probar mediante el productor de consola incluido con Kafka. Puede que necesite convertidores diferentes para otros productores y consumidores. Para información sobre el uso de otros valores de convertidor, consulte https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
Igual que se da. N/D consumer.max.poll.records=10
Agregue al final del archivo. Este cambio es para evitar tiempos de espera en el conector de receptor al limitarlo a 10 registros a la vez. Para obtener más información, vea https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. Para guardar el archivo, presione Ctr+X, luego, Y y Entrar.
Para crear los temas que usa el conector, use los comandos siguientes:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
Para comprobar que los temas
iotin
yiotout
existen, use el comando siguiente:/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
El tema
iotin
se usa para recibir mensajes de IoT Hub. El temaiotout
se usa para enviar mensajes a IoT Hub.
Obtención de información de conexión de IoT Hub
Para recuperar información de IoT Hub que usa el conector, siga estos pasos:
Obtenga el punto de conexión compatible con el centro de eventos y el nombre del punto de conexión compatible con el centro de eventos de su centro de IoT. Para obtener esta información, use uno de los métodos siguientes:
En Azure Portal, use los pasos siguientes:
Vaya a su instancia de IoT Hub y seleccione Puntos de conexión.
En Puntos de conexión integrados, seleccione Eventos.
En Propiedades, copie el valor de los campos siguientes:
- Nombre compatible con Event Hub
- Punto de conexión compatible con Event Hub
- Particiones
Importante
El valor del punto de conexión del portal puede contener texto adicional que no es necesario en este ejemplo. Extraiga el texto que coincida con este patrón
sb://<randomnamespace>.servicebus.windows.net/
.
En la CLI de Azure, use el comando siguiente:
az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
Reemplace
myhubname
por el nombre de su centro de IoT: La respuesta es similar al siguiente texto:"EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/", "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e", "Partitions": 2
Obtenga la directiva de acceso compartido y la clave. En este ejemplo, use la clave service. Para obtener esta información, use uno de los métodos siguientes:
En Azure Portal, use los pasos siguientes:
- Seleccione Directivas de acceso compartido y, luego, service.
- Copie el valor de Clave principal.
- Copie el valor Connection string--primary key.
En la CLI de Azure, use el comando siguiente:
Para obtener el valor de clave principal, use el comando siguiente:
az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
Reemplace
myhubname
por el nombre de su centro de IoT: La respuesta es la clave principal a la directivaservice
de este centro.Para obtener la cadena de conexión de la directiva
service
, use el comando siguiente:az iot hub connection-string show --name myhubname --policy-name service --query "connectionString"
Reemplace
myhubname
por el nombre de su centro de IoT: La respuesta es la cadena de conexión de la directivaservice
.
Configuración de la conexión de origen
Para configurar el origen para que funcione con su instancia de IoT Hub, realice las siguientes acciones desde una conexión SSH hasta el nodo perimetral:
Cree una copia del archivo
connect-iot-source.properties
en el directorio/usr/hdp/current/kafka-broker/config/
. Para descargar el archivo del proyecto toketi-kafka-connect-iothub, use el comando siguiente:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
Para editar el archivo
connect-iot-source.properties
y agregar la información del centro de IoT, use el comando siguiente:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
En el editor, busque y cambie las siguientes entradas:
Valor actual Editar Kafka.Topic=PLACEHOLDER
Reemplace PLACEHOLDER
poriotin
. Los mensajes recibidos del centro de IoT se colocan en el temaiotin
.IotHub.EventHubCompatibleName=PLACEHOLDER
reemplace PLACEHOLDER
por el nombre compatible con el centro de eventos.IotHub.EventHubCompatibleEndpoint=PLACEHOLDER
reemplace PLACEHOLDER
por el punto de conexión compatible con el centro de eventos.IotHub.AccessKeyName=PLACEHOLDER
Reemplace PLACEHOLDER
porservice
.IotHub.AccessKeyValue=PLACEHOLDER
reemplace PLACEHOLDER
por la clave principal de la directivaservice
.IotHub.Partitions=PLACEHOLDER
reemplace PLACEHOLDER
por el número de particiones de los pasos anteriores.IotHub.StartTime=PLACEHOLDER
reemplace PLACEHOLDER
por una fecha UTC. Esta fecha corresponde a cuando el conector comienza a buscar mensajes. El formato de fecha esyyyy-mm-ddThh:mm:ssZ
.BatchSize=100
Reemplace 100
por5
. Este cambio hace que el conector lea los mensajes en Kafka una vez que hay cinco nuevos mensajes en el centro de IoT.Para ver una configuración de ejemplo, consulte Conector de orígenes de conexión de Kafka para Azure IoT Hub.
Para guardar el archivo, use Ctrl+X, Y y, luego, Entrar.
Para más información sobre cómo configurar el origen del conector, consulte https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md.
Configuración de la conexión de receptor
Para configurar la conexión de receptor para que funcione con su instancia de IoT Hub, realice las siguientes acciones desde una conexión SSH hasta el nodo perimetral:
Cree una copia del archivo
connect-iothub-sink.properties
en el directorio/usr/hdp/current/kafka-broker/config/
. Para descargar el archivo del proyecto toketi-kafka-connect-iothub, use el comando siguiente:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
Para editar el archivo
connect-iothub-sink.properties
y agregar la información del centro de IoT, use el comando siguiente:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
En el editor, busque y cambie las siguientes entradas:
Valor actual Editar topics=PLACEHOLDER
Reemplace PLACEHOLDER
poriotout
. Los mensajes enviados aiotout
se reenvían al centro de IoT.IotHub.ConnectionString=PLACEHOLDER
reemplace PLACEHOLDER
por la cadena de conexión de la directivaservice
.Para ver una configuración de ejemplo, consulte Conector de receptor de conexión de Kafka para Azure IoT Hub.
Para guardar el archivo, use Ctrl+X, Y y, luego, Entrar.
Para más información sobre cómo configurar el conector de receptor, consulte https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Inicio del conector de origen
Para iniciar el conector de origen, use el comando siguiente desde una conexión SSH hasta el nodo perimetral:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
Una vez que se inicia el conector, envíe mensajes al centro de IoT desde sus dispositivos. A medida que el conector lee los mensajes del centro de IoT y los almacena en el tema de Kafka, registra información en la consola:
[2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39) [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
Nota:
Es posible que vea varias advertencias cuando se inicia el conector. Estas advertencias no ocasionan problemas con la recepción de mensajes desde el centro de IoT.
Detenga el conector después de unos minutos mediante Ctrl + C dos veces. El conector tarda unos minutos en detenerse.
Inicio del conector de receptor
Desde una conexión SSH hasta el nodo perimetral, use el comando siguiente para iniciar el conector de receptor en modo independiente:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
Cuando se ejecuta el conector, se muestra información similar al siguiente texto:
[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
Nota:
Al iniciar el conector, puede que observe varias advertencias. Puede omitir estos errores con seguridad.
Envío de mensajes
Para enviar mensajes mediante el conector, siga estos pasos:
Abra una segunda sesión SSH en el clúster de Kafka:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
Obtenga la dirección de los agentes de Kafka de la nueva sesión SSH. Reemplace PASSWORD por la contraseña de inicio de sesión del clúster y, después, escriba el comando:
export password='PASSWORD' export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
Para enviar mensajes al tema
iotout
, use el comando siguiente:/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
Este comando no le devuelve al símbolo del sistema normal de Bash. En su lugar, envía la entrada del teclado al tema
iotout
.Para enviar un mensaje a su dispositivo, pegue un documento JSON en la sesión SSH para
kafka-console-producer
.Importante
Debe establecer el valor de la entrada
"deviceId"
en el identificador del dispositivo. En el ejemplo siguiente, el dispositivo se denominamyDeviceId
:{"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
El esquema de este documento JSON se describe con más detalle en https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Si está usando el dispositivo Raspberry Pi simulado y está en ejecución, el dispositivo registra el siguiente mensaje.
Receive message: Turn On
Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.
Para más información sobre cómo usar el conector de receptor, consulte https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Pasos siguientes
En este documento, ha aprendido a utilizar Apache Kafka Connect API para iniciar IoT Kafka Connector en HDInsight. Utilice los vínculos siguientes para conocer otras formas de trabajar con Kafka: