Use o Apache Kafka no HDInsight com o Microsoft Azure Hub IoT
Aprenda como usar o conector do Hub IoT do Azure do Apache Kafka Connect para mover dados entre o Apache Kafka no HDInsight e o Azure IoT Hub. Neste documento, você aprenderá a executar o conector do Hub IoT de um nó de borda do cluster.
A API do Kafka Connect permite que você implemente os conectores que continuamente extraem dados para Kafka ou enviam dados do Kafka para outro sistema. O Hub Apache Kafka Connect do Azure IoT é um conector que extrai dados do Hub do Azure IoT para o Kafka. Também pode enviar dados do Kafka para o Hub IoT.
Quando efetuar pull do Hub IoT, você usa um conector de fonte. Quando efetuar pull do Hub IoT, você usa um conector de coletor. O conector de Hub IoT fornece a origem e os conectores de coletor.
O diagrama a seguir mostra o fluxo de dados entre o Hub IoT do Microsoft zure e Kafka no HDInsight ao usar o conector.
Para obter mais informações sobre como conectar a API, consulte https://kafka.apache.org/documentation/#connect.
Pré-requisitos
Um cluster do Apache Kafka no HDInsight. Para obter mais informações, consulte o documento Início rápido do Kafka no HDInsight.
Um nó de borda do cluster Kafka. Para saber mais, consulte o documento Usar nós de borda com o HDInsight.
Um cliente SSH. Para saber mais, confira Conectar-se ao HDInsight (Apache Hadoop) usando SSH.
Um dispositivo e um Hub IoT do Azure. Para este artigo, considere usar o Simulador online do Connect Raspberry Pi para o Hub IoT do Azure.
Compilar o conector
Baixe a fonte para o conector em https://github.com/Azure/toketi-kafka-connect-iothub/ para seu ambiente local.
Em um prompt de comando, navegue até o diretório
toketi-kafka-connect-iothub-master
. Em seguida, use o seguinte comando para criar e empacotar o projeto:sbt assembly
A compilação leva alguns minutos para ser concluído. Esse comando cria um arquivo chamado
kafka-connect-iothub-assembly_2.11-0.7.0.jar
no diretóriotoketi-kafka-connect-iothub-master\target\scala-2.11
do projeto.
Instalar o conector
Carregue o arquivo .jar no nó de borda do Kafka no cluster do HDInsight. Edite o comando a seguir substituindo
CLUSTERNAME
pelo nome real do cluster. Os valores padrão para a conta de usuário SSH e o nome do nó de borda são usados para modificar conforme necessário.scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
Uma vez concluída a cópia do arquivo, conecte-se ao nó de borda usando SSH:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
Para instalar o conector ao diretório do Kafka
libs
, use o seguinte comando:sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
Mantenha a conexão SSH ativa para as etapas restantes.
Configure Apache Kafka
De uma conexão SSH para o nó de borda, use as etapas a seguir para configurar o Kafka para executar o conector no modo autônomo:
Configurar variável de senha. Substitua PASSWORD pela senha de logon do cluster e insira o comando:
export password='PASSWORD'
Instale o utilitário jq. O utilitário jq torna mais fácil processar documentos JSON retornados de consultas do Ambari. Insira o seguinte comando:
sudo apt -y install jq
Coletar endereço IP do agente do Kafka. Pode haver muitos agentes no seu cluster, mas você só precisa fazer referência a um ou dois. Para obter o endereço dos dois hosts de agente, use o seguinte comando:
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2` echo $KAFKABROKERS
Copie os valores para uso posterior. O valor retornado é semelhante ao seguinte texto:
<brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092
Obter o endereço de nós de Apache Zookeeper. Há vários nós do Zookeeper no cluster, mas você só precisa referenciar um ou dois. Use o seguinte comando para armazenar os endereços na variável
KAFKAZKHOSTS
:export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
Ao executar o conector no modo autônomo, o
/usr/hdp/current/kafka-broker/config/connect-standalone.properties
arquivo é usado para se comunicar com os agentes de Kafka. Para editar esse arquivoconnect-standalone.properties
, use o seguinte comando:sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
Faça as seguintes edições:
Valor atual Novo valor Comentário bootstrap.servers=localhost:9092
Substitua o valor localhost:9092
pelos hosts de agente da etapa anteriorDefine a configuração autônoma para o nó de borda para localizar os agentes Kafka. key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
Essa alteração permite que você teste usando o produtor do console incluído com Kafka. Talvez seja necessário conversores diferentes para outros produtores e consumidores. Para obter informações sobre como usar outros valores de conversor, consulte https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
O mesmo que foi dado. N/D consumer.max.poll.records=10
Adicione ao final do arquivo. Essa alteração é evitar os tempos limite no conector do coletor limitando a 10 registros por vez. Para obter mais informações, consulte https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. Para salvar o arquivo, use Ctrl + X, Y e, em seguida, Enter.
Para criar os tópicos usados pela conector use os comandos a seguir:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
Para verificar que os tópicos
iotin
eiotout
existem, use o comando a seguir:/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
O tópico
iotin
é usado para receber mensagens de Hub IoT. O tópicoiotout
é usado para enviar mensagens de Hub IoT.
Obter a cadeia de conexão do Hub IoT
Para recuperar informações de Hub IoT usadas pelo conector, use as seguintes etapas:
Obtenha o ponto de extremidade compatível com o evento de Hub e nome de ponto de extremidade compatível para o seu Hub IoT. Para obter essas informações, use um dos métodos a seguir:
Do portal do Azure, use as etapas a seguir:
Navegue até seu Hub IoT e clique em Pontos de Extremidade.
Em Pontos de extremidade internos, selecione Eventos.
De Propriedades, copie o valor dos campos a seguir:
- Nome compatível com o Hub de eventos
- Ponto de extremidade compatível com o hub de eventos
- Partições
Importante
O valor de ponto de extremidade do portal pode conter texto extra que não é necessário neste exemplo. Extrair o texto que corresponde a esse padrão
sb://<randomnamespace>.servicebus.windows.net/
.
Na CLI do Azure digite o seguinte comando:
az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
Substitua
myhubname
pelo nome do seu Hub IoT. A resposta é semelhante ao texto a seguir:"EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/", "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e", "Partitions": 2
Obtenha a política de acesso compartilhado e chave. Para este exemplo, use a chave de serviço. Para obter essas informações, use um dos métodos a seguir:
Do portal do Azure, use as etapas a seguir:
- Selecione Políticas de acesso compartilhado e selecione serviço.
- Copie o valor da chave primária.
- Copie o valor de Cadeia de conexão – chave primária.
Na CLI do Azure digite o seguinte comando:
Para obter o valor de chave primária, use o seguinte comando:
az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
Substitua
myhubname
pelo nome do seu Hub IoT. A resposta é a chave primária para oservice
política para esse hub.Para obter a cadeia de conexão para a
service
política, use o seguinte comando:az iot hub connection-string show --name myhubname --policy-name service --query "connectionString"
Substitua
myhubname
pelo nome do seu Hub IoT. A resposta e a string de conexão para a políticaservice
.
Configurar a conexão da fonte
Para configurar a fonte para trabalhar com o Hub IoT, execute as seguintes ações de uma conexão SSH para o nó de borda:
Criar uma cópia do arquivo
connect-iot-source.properties
no/usr/hdp/current/kafka-broker/config/
diretório. Para baixar o arquivo do projeto toketi-kafka-connect-iothub, use o seguinte comando:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
Para editar o arquivo
connect-iot-source.properties
e adicionar as informações de Hub IoT, use o seguinte comando:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
No editor, localize e altere as seguintes entradas:
Valor atual Editar Kafka.Topic=PLACEHOLDER
Substitua PLACEHOLDER
poriotin
. Mensagens recebidas do Hub IoT são colocadas noiotin
tópico.IotHub.EventHubCompatibleName=PLACEHOLDER
Substitua PLACEHOLDER
pelo Hub de Eventos - nome compatível.IotHub.EventHubCompatibleEndpoint=PLACEHOLDER
Substitua PLACEHOLDER
pelo Hub de Eventos - ponto de extremidade compatível.IotHub.AccessKeyName=PLACEHOLDER
Substitua PLACEHOLDER
porservice
.IotHub.AccessKeyValue=PLACEHOLDER
Substitua PLACEHOLDER
pela chave primária daservice
política.IotHub.Partitions=PLACEHOLDER
Substitua PLACEHOLDER
pelo número de partições nas etapas anteriores.IotHub.StartTime=PLACEHOLDER
Substitua PLACEHOLDER
por uma data UTC. Esta data é quando o conector inicia a verificação de mensagens. O formato de data éyyyy-mm-ddThh:mm:ssZ
.BatchSize=100
Substitua 100
por5
. Essa alteração faz com que o conector lei as mensagens em Kafka, uma vez que há cinco novas mensagens no Hub IoT.Para obter um exemplo de configuração, veja Conector de origem do Kafka Connect para o Hub IoT do Azure.
Para salvar as alterações, use Ctrl + X, Y e, em seguida, Enter.
Para obter mais informações sobre como configurar a fonte do connector, consulte https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md.
Configurar a conexão do coletor
Para configurar a conexão do coletor para trabalhar com o Hub IoT, execute as seguintes ações de uma conexão SSH para o nó de borda:
Criar uma cópia do arquivo
connect-iothub-sink.properties
no/usr/hdp/current/kafka-broker/config/
diretório. Para baixar o arquivo do projeto toketi-kafka-connect-iothub, use o seguinte comando:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
Para editar o arquivo
connect-iothub-sink.properties
e adicionar as informações de Hub IoT, use o seguinte comando:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
No editor, localize e altere as seguintes entradas:
Valor atual Editar topics=PLACEHOLDER
Substitua PLACEHOLDER
poriotout
. Mensagens gravadas noiotout
tópico são encaminhadas para o Hub IoT.IotHub.ConnectionString=PLACEHOLDER
Substitua PLACEHOLDER
pela cadeia de conexão obtida para aservice
política.Para obter um exemplo de configuração, veja Conector do coletor do Kafka Connect para o Hub IoT do Azure.
Para salvar as alterações, use Ctrl + X, Y e, em seguida, Enter.
Para obter mais informações sobre como configurar a fonte do conector, consulte https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Iniciar o conector de origem
Para iniciar o conector de origem, use o comando a seguir a partir de uma conexão SSH para o nó de borda:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
Depois que o conector é iniciado, envie mensagens para o Hub IoT do seu dispositivo. Como o conector lê mensagens do Hub IoT e os armazena no tópico Kafka, ele registra informações no console:
[2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39) [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
Observação
Você pode ver avisos que o conector for iniciado. Esses avisos não causam problemas para receber mensagens de hub IoT.
Pare o conector após alguns minutos usando Ctrl + C duas vezes. Leva alguns minutos para que o conector pare.
Iniciar o coletor do conector
Para uma conexão SSH ao nó de conexão, use o comando a seguir para iniciar o conector do coletor no modo autônomo:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
Informações semelhantes ao seguinte texto serão exibidas são exibidas:
[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
Observação
Você pode ver vários avisos que o conector foi iniciado. Você pode ignorá-los com segurança.
Enviar mensagens
Para enviar mensagens por meio do conector, use as seguintes etapas:
Abra uma segunda sessão SSH para o cluster Kafka:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
Obtenha o endereço dos agentes Kafka para a nova sessão SSH. Substitua PASSWORD pela senha de logon do cluster e insira o comando:
export password='PASSWORD' export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
Para enviar mensagens ao tópico
iotout
, use o comando a seguir:/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
Esse comando não retorna ao prompt do Bash normal. Em vez disso, envia a entrada do teclado para o tópico
iotout
.Para enviar uma mensagem para o seu dispositivo, colar um documento JSON para a sessão SSH para a
kafka-console-producer
.Importante
Você deve definir o valor de
"deviceId"
entrada para a ID do dispositivo. O exemplo a seguir, o dispositivo é denominadomyDeviceId
:{"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
O esquema para este documento JSON é descrito mais detalhadamente no https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Se você estiver usando o dispositivo Raspberry Pi simulado e ele estiver em execução, o dispositivo registrará a seguinte mensagem.
Receive message: Turn On
Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.
Para obter mais informações sobre como usar o conector do coletor, confira https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Próximas etapas
Neste documento, você aprendeu a usar a API do Apache Kafka Connect para iniciar o IoT Kafka Connector no HDInsight. Use os links a seguir para descobrir outras maneiras de trabalhar com Kafka: