Usar o Apache Kafka no HDInsight com o Hub IoT do Azure
Saiba como usar o conector Apache Kafka Connect do Hub IoT do Azure para mover dados entre o Apache Kafka no HDInsight e o Hub IoT do Azure. Neste documento, você aprenderá a executar o conector do Hub IoT a partir de um nó de borda no cluster.
A API do Kafka Connect permite implementar conectores que extraem dados continuamente para o Kafka ou enviam dados do Kafka para outro sistema. O Apache Kafka Connect Azure IoT Hub é um conector que extrai dados do Hub IoT do Azure para o Kafka. Ele também pode enviar dados de Kafka para o Hub IoT.
Ao extrair do Hub IoT, você usa um conector de origem . Ao enviar por push para o Hub IoT, você usa um conector de coletor . O conector do Hub IoT fornece os conectores de origem e coletor.
O diagrama a seguir mostra o fluxo de dados entre o Hub IoT do Azure e o Kafka no HDInsight ao usar o conector.
Para obter mais informações sobre a API Connect, consulte https://kafka.apache.org/documentation/#connect.
Pré-requisitos
Um cluster Apache Kafka no HDInsight. Para obter mais informações, veja o documento Início rápido do Kafka no HDInsight.
Um nó de borda no cluster de Kafka. Para obter mais informações, consulte o documento Usar nós de borda com o HDInsight .
Um cliente SSH. Para obter mais informações, veja Ligar ao HDInsight (Apache Hadoop) através de SSH.
Um Hub IoT do Azure e um dispositivo. Para este artigo, considere usar o simulador online Connect Raspberry Pi ao Hub IoT do Azure.
Ferramenta de construção Scala.
Construa o conector
Transfira a origem do https://github.com/Azure/toketi-kafka-connect-iothub/ conector para o seu ambiente local.
Em um prompt de comando, navegue até o
toketi-kafka-connect-iothub-master
diretório. Em seguida, use o seguinte comando para criar e empacotar o projeto:sbt assembly
A compilação leva alguns minutos para ser concluída. O comando cria um arquivo nomeado
kafka-connect-iothub-assembly_2.11-0.7.0.jar
notoketi-kafka-connect-iothub-master\target\scala-2.11
diretório para o projeto.
Instale o conector
Carregue o arquivo .jar para o nó de borda do seu cluster Kafka no HDInsight. Edite o comando a seguir substituindo
CLUSTERNAME
pelo nome real do cluster. Os valores padrão para a conta de usuário SSH e o nome do nó de borda são usados e modificados conforme necessário.scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
Quando a cópia do arquivo for concluída, conecte-se ao nó de borda usando SSH:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
Para instalar o conector no diretório Kafka
libs
, use o seguinte comando:sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
Mantenha sua conexão SSH ativa para as etapas restantes.
Configurar o Apache Kafka
Da sua conexão SSH para o nó de borda, use as seguintes etapas para configurar o Kafka para executar o conector no modo autônomo:
Configure a variável de senha. Substitua PASSWORD pela senha de login do cluster e digite o comando:
export password='PASSWORD'
Instale o utilitário jq . jq facilita o processamento de documentos JSON retornados de consultas Ambari. Introduza o seguinte comando:
sudo apt -y install jq
Obtenha o endereço dos corretores Kafka. Pode haver muitos corretores em seu cluster, mas você só precisa fazer referência a um ou dois. Para obter o endereço de dois hosts de broker, use o seguinte comando:
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2` echo $KAFKABROKERS
Copie os valores para uso posterior. O valor devolvido é semelhante ao seguinte texto:
<brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092
Obtenha o endereço dos nós do Apache Zookeeper. Há vários nós do Zookeeper no cluster, mas você só precisa fazer referência a um ou dois. Use o seguinte comando para armazenar os endereços na variável
KAFKAZKHOSTS
:export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
Ao executar o conector no modo autônomo, o
/usr/hdp/current/kafka-broker/config/connect-standalone.properties
arquivo é usado para se comunicar com os corretores Kafka. Para editar oconnect-standalone.properties
arquivo, use o seguinte comando:sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
Faça as seguintes edições:
Valor atual Novo valor Comentário bootstrap.servers=localhost:9092
Substitua o localhost:9092
valor pelos hosts do broker da etapa anteriorConfigura a configuração autônoma para o nó de borda para localizar os corretores Kafka. key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
Essa alteração permite que você teste usando o produtor do console incluído no Kafka. Pode necessitar de fabricantes de embalagens diferentes para outros produtores e consumidores. Para obter informações sobre como usar outros valores do conversor, consulte https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
O mesmo que dado. N/A consumer.max.poll.records=10
Adicionar ao final do arquivo. Essa alteração é para evitar tempos limite no conector do coletor, limitando-o a 10 registros de cada vez. Para obter mais informações, veja https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. Para guardar o ficheiro, utilize Ctrl + X, Y e, em seguida, Enter.
Para criar os tópicos usados pelo conector, use os seguintes comandos:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
Para verificar se os
iotin
tópicos eiotout
existem, use o seguinte comando:/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
O
iotin
tópico é usado para receber mensagens do Hub IoT. Oiotout
tópico é usado para enviar mensagens para o Hub IoT.
Obter informações de conexão do Hub IoT
Para recuperar informações do hub IoT usadas pelo conector, use as seguintes etapas:
Obtenha o ponto de extremidade compatível com o Hub de Eventos e o nome do ponto de extremidade compatível com o Hub de Eventos para seu hub IoT. Para obter essas informações, use um dos seguintes métodos:
No portal do Azure, use as seguintes etapas:
Navegue até o Hub IoT e selecione Pontos de extremidade.
Em Pontos de extremidade internos, selecione Eventos.
Em Propriedades, copie o valor dos seguintes campos:
- Nome compatível com o Hub de Eventos
- Ponto de extremidade compatível com o Hub de Eventos
- Partições
Importante
O valor do ponto de extremidade do portal pode conter texto extra que não é necessário neste exemplo. Extraia o texto que corresponde a este padrão
sb://<randomnamespace>.servicebus.windows.net/
.
Na CLI do Azure, use o seguinte comando:
az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
Substitua
myhubname
pelo nome do seu hub IoT. A resposta é semelhante ao seguinte texto:"EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/", "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e", "Partitions": 2
Obtenha a política e a chave de acesso compartilhado. Para este exemplo, use a chave de serviço . Para obter essas informações, use um dos seguintes métodos:
No portal do Azure, use as seguintes etapas:
- Selecione Políticas de acesso compartilhado e, em seguida, selecione serviço.
- Copie o valor da chave primária.
- Copie a cadeia de conexão - valor da chave primária.
Na CLI do Azure, use o seguinte comando:
Para obter o valor da chave primária, use o seguinte comando:
az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
Substitua
myhubname
pelo nome do seu hub IoT. A resposta é a chave primária para aservice
política para este hub.Para obter a cadeia de conexão para a
service
política, use o seguinte comando:az iot hub show-connection-string --name myhubname --policy-name service --query "connectionString"
Substitua
myhubname
pelo nome do seu hub IoT. A resposta é a cadeia de conexão para aservice
política.
Configurar a conexão de origem
Para configurar a origem para trabalhar com seu Hub IoT, execute as seguintes ações de uma conexão SSH com o nó de borda:
Crie uma cópia do
connect-iot-source.properties
arquivo no/usr/hdp/current/kafka-broker/config/
diretório. Para baixar o arquivo do projeto toketi-kafka-connect-iothub, use o seguinte comando:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
Para editar o
connect-iot-source.properties
arquivo e adicionar as informações do hub IoT, use o seguinte comando:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
No editor, localize e altere as seguintes entradas:
Valor atual Editar Kafka.Topic=PLACEHOLDER
Substituir PLACEHOLDER
poriotin
. As mensagens recebidas do hub IoT são colocadas noiotin
tópico.IotHub.EventHubCompatibleName=PLACEHOLDER
Substitua pelo nome compatível com o Hub de Eventos PLACEHOLDER
.IotHub.EventHubCompatibleEndpoint=PLACEHOLDER
Substitua pelo ponto de extremidade compatível com o Hub de Eventos PLACEHOLDER
.IotHub.AccessKeyName=PLACEHOLDER
Substituir PLACEHOLDER
porservice
.IotHub.AccessKeyValue=PLACEHOLDER
Substitua PLACEHOLDER
pela chave primária daservice
política.IotHub.Partitions=PLACEHOLDER
Substitua PLACEHOLDER
pelo número de partições das etapas anteriores.IotHub.StartTime=PLACEHOLDER
Substitua PLACEHOLDER
por uma data UTC. Esta data é quando o conector começa a verificar se há mensagens. O formato de data éyyyy-mm-ddThh:mm:ssZ
.BatchSize=100
Substituir 100
por5
. Essa alteração faz com que o conector leia mensagens no Kafka quando houver cinco novas mensagens no hub IoT.Para obter um exemplo de configuração, consulte Kafka Connect Source Connector for Azure IoT Hub.
Para guardar as alterações, utilize Ctrl + X, Y e, em seguida, Enter.
Para obter mais informações sobre como configurar a origem do conector, consulte https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md.
Configurar a conexão do coletor
Para configurar a conexão do coletor para funcionar com seu Hub IoT, execute as seguintes ações de uma conexão SSH com o nó de borda:
Crie uma cópia do
connect-iothub-sink.properties
arquivo no/usr/hdp/current/kafka-broker/config/
diretório. Para baixar o arquivo do projeto toketi-kafka-connect-iothub, use o seguinte comando:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
Para editar o
connect-iothub-sink.properties
arquivo e adicionar as informações do hub IoT, use o seguinte comando:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
No editor, localize e altere as seguintes entradas:
Valor atual Editar topics=PLACEHOLDER
Substituir PLACEHOLDER
poriotout
. As mensagens gravadas noiotout
tópico são encaminhadas para o hub IoT.IotHub.ConnectionString=PLACEHOLDER
Substitua PLACEHOLDER
pela cadeia de conexão daservice
política.Para obter um exemplo de configuração, consulte Kafka Connect Sink Connector for Azure IoT Hub.
Para guardar as alterações, utilize Ctrl + X, Y e, em seguida, Enter.
Para obter mais informações sobre como configurar o coletor de conectores, consulte https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Inicie o conector de origem
Para iniciar o conector de origem, use o seguinte comando de uma conexão SSH para o nó de borda:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
Assim que o conector for iniciado, envie mensagens para o hub IoT a partir do(s) seu(s) dispositivo(s). À medida que o conector lê mensagens do hub IoT e as armazena no tópico Kafka, ele registra informações no console:
[2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39) [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
Nota
Você pode ver vários avisos quando o conector é iniciado. Esses avisos não causam problemas com o recebimento de mensagens do hub IoT.
Pare o conector após alguns minutos usando Ctrl + C duas vezes. Demora alguns minutos para o conector parar.
Inicie o conector do coletor
De uma conexão SSH para o nó de borda, use o seguinte comando para iniciar o conector do coletor no modo autônomo:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
À medida que o conector é executado, informações semelhantes ao seguinte texto são exibidas:
[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
Nota
Você pode notar vários avisos quando o conector é iniciado. Pode ignorar com segurança estes avisos.
Enviar mensagens
Para enviar mensagens através do conector, use as seguintes etapas:
Abra uma segunda sessão SSH para o cluster Kafka:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
Obtenha o endereço dos corretores Kafka para a nova sessão ssh. Substitua PASSWORD pela senha de login do cluster e digite o comando:
export password='PASSWORD' export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
Para enviar mensagens para o
iotout
tópico, use o seguinte comando:/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
Este comando não retorna ao prompt Bash normal. Em vez disso, ele envia a entrada do teclado para o
iotout
tópico.Para enviar uma mensagem para o seu dispositivo, cole um documento JSON na sessão SSH do
kafka-console-producer
.Importante
Você deve definir o valor da
"deviceId"
entrada para o ID do seu dispositivo. No exemplo a seguir, o dispositivo é chamadomyDeviceId
:{"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
O esquema para este documento JSON é descrito com mais detalhes em https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Se você estiver usando o dispositivo Raspberry Pi simulado e ele estiver em execução, o dispositivo registrará a seguinte mensagem.:
Receive message: Turn On
Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.
Para obter mais informações sobre como usar o conector do coletor, consulte https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Próximos passos
Neste documento, você aprendeu como usar a API Apache Kafka Connect para iniciar o IoT Kafka Connector no HDInsight. Use os links a seguir para descobrir outras maneiras de trabalhar com Kafka: