Использование Apache Kafka в HDInsight с Центром Интернета вещей
Узнайте, как использовать соединитель Apache Kafka Connect Azure IoT Hub для перемещения данных между Apache Kafka в HDInsight и Центром Интернета вещей. В этом документе показано, как запускать соединитель Центра Интернета вещей из граничного узла кластера.
API Kafka Connect позволяет реализовать соединители, которые постоянно извлекают данные в Kafka или отправляет данные из Kafka в другую систему. Apache Kafka Connect Azure IoT Hub — это соединитель, который извлекает данные из Центра Интернета вещей в Apache Kafka. Он также может отправлять данные из Kafka в Центр Интернета вещей.
При извлечении из Центра Интернета вещей используется соединитель источника. При отправке в Центр Интернета вещей используется соединитель приемника. Соединитель Центра Интернета вещей может выступать как соединителем источника, так и соединителем приемника.
На следующей схеме показан поток данных между Центром Интернета вещей и Kafka в HDInsight при использовании соединителя.
Дополнительные сведения о том, как подключить API, см. в статье https://kafka.apache.org/documentation/#connect.
Необходимые компоненты
Кластер Apache Kafka в HDInsight. Дополнительные сведения см . в кратком руководстве по HdInsight в Kafka.
Граничный узел в кластере Kafka. Дополнительные сведения см. в статье "Использование пограничных узлов с документом HDInsight ".
Клиент SSH. Дополнительные сведения см. в руководстве по подключению к HDInsight (Apache Hadoop) с помощью SSH.
Центр Интернета вещей Azure и устройство. Для работы с этой статьей рекомендуется подключить онлайн-симулятор Raspberry Pi к Центру Интернета вещей Azure.
Компиляция соединителя
Скачайте исходный код соединителя из https://github.com/Azure/toketi-kafka-connect-iothub/ в локальную среду.
В командной строке перейдите в каталог
toketi-kafka-connect-iothub-master
. Для сборки и упаковки проекта выполните следующую команду:sbt assembly
Сборка занимает несколько минут. Эта команда создает файл с именем
kafka-connect-iothub-assembly_2.11-0.7.0.jar
в каталогеtoketi-kafka-connect-iothub-master\target\scala-2.11
проекта.
Установка соединителя
Отправьте JAR-файл на граничный узел кластера Kafka в HDInsight. Измените следующую команду, заменив
CLUSTERNAME
фактическое имя кластера. Значения по умолчанию для учетной записи пользователя SSH и имени пограничного узла используются для изменения по мере необходимости.scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
Когда файл будет скопирован, подключитесь к граничному узлу с помощью SSH:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
Чтобы установить соединитель в каталог
libs
Kafka, используйте следующую команду:sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
Для выполнения следующих шагов оставьте SSH-подключение активным.
Настройка Apache Kafka
В SSH-подключении к граничному узлу выполните следующие шаги, чтобы настроить в Kafka выполнение соединителя в автономном режиме:
Настройте переменную пароля. Замените заполнитель PASSWORD паролем для входа в кластер, а затем введите следующую команду:
export password='PASSWORD'
Установите служебную программу jq. jq упрощает обработку документов JSON, возвращаемых запросами Ambari. Введите следующую команду:
sudo apt -y install jq
Получите адреса брокеров Kafka. В вашем кластере может быть много брокеров, но вам нужно всего лишь ссылаться на один или два. Чтобы получить адрес двух узлов брокера, используйте следующую команду:
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2` echo $KAFKABROKERS
Скопируйте эти значения для последующего использования. Возвращаемое значение аналогично приведенному ниже тексту.
<brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092
Получите адреса узлов Apache Zookeeper. В кластере имеется несколько узлов Zookeeper, но необходимо ссылаться только на один или два. Выполните следующую команду для сохранения этих адресов в переменную
KAFKAZKHOSTS
.export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
При запуске соединителя в автономном режиме
/usr/hdp/current/kafka-broker/config/connect-standalone.properties
файл используется для взаимодействия с брокерами Kafka. Чтобы изменить файлconnect-standalone.properties
, используйте следующую команду:sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
Внесите следующие правки:
Текущее значение Новое значение Комментарий bootstrap.servers=localhost:9092
Замените значение localhost:9092
именами узлов брокера из предыдущего шага.Настраивает конфигурацию автономного режима так, чтобы граничный узел нашел брокеры Kafka. key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
Это изменение можно проверить с помощью отправителя консоли, входящего в состав Kafka. Для различных отправителей и потребителей вам могут понадобиться различные преобразователи. Дополнительные сведения об использовании других значений преобразования см. по адресу https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
То же, что и задано. Н/П consumer.max.poll.records=10
Добавьте это в конец файла. Это изменение предназначено для предотвращения времени ожидания в соединителе приемника, ограничивая его 10 записями за раз. Дополнительные сведения см. в разделе https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md. Чтобы сохранить файл, нажмите клавиши CTRL+X, затем — Y и ВВОД.
Чтобы создать разделы для соединителя, используйте следующие команды:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
Чтобы проверить, что разделы
iotin
иiotout
существуют, используйте следующую команду:/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
Раздел
iotin
используется для получения сообщений из Центра Интернета вещей. Разделiotout
используется для отправки сообщений в Центр Интернета вещей.
Получение сведений о подключении Центра Интернета вещей
Чтобы извлечь информацию о Центре Интернета вещей, используемую соединителем, выполните следующие действия:
Получите конечную точку, совместимую с центрами событий, и ее имя для Центра Интернета вещей. Чтобы получить эту информацию, используйте один из указанных ниже способов:
На портале Azure выполните указанные ниже действия.
Перейдите к Центру Интернета вещей и выберите Конечные точки.
В разделе Встроенные конечные точки выберите События.
В разделе Свойства скопируйте значения следующих полей:
- Имя, совместимое с центрами событий
- Конечная точка, совместимая с центрами событий
- Секции
Внимание
Значение конечной точки на портале может содержать лишний текст, который не требуется в этом примере. Извлеките текст, который соответствует этому шаблону:
sb://<randomnamespace>.servicebus.windows.net/
.
В интерфейсе командной строки Azure введите следующую команду:
az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
Замените переменную
myhubname
именем Центра Интернета вещей. В ответ вы получите примерно такой текст:"EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/", "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e", "Partitions": 2
Получите политику общего доступа и ключ. Для этого примера используйте служебный ключ. Чтобы получить эту информацию, используйте один из указанных ниже способов:
На портале Azure выполните указанные ниже действия.
- Выберите Политика общего доступа, а затем выберите службу.
- Скопируйте значение первичного ключа.
- Скопируйте значение поля Строка подключения — первичный ключ.
В интерфейсе командной строки Azure введите следующую команду:
Чтобы получить значение первичного ключа, используйте следующую команду:
az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
Замените переменную
myhubname
именем Центра Интернета вещей. Ответ — это первичный ключ для политикиservice
этого центра.Чтобы получить строку подключения для политики
service
, используйте следующую команду:az iot hub connection-string show --name myhubname --policy-name service --query "connectionString"
Замените переменную
myhubname
именем Центра Интернета вещей. Ответом является строка подключения для политикиservice
.
Настройка подключения к источнику
Чтобы настроить источник для работы с Центром Интернета вещей, выполните следующие действия из SSH-подключения к граничному узлу:
Создайте копию файла
connect-iot-source.properties
в каталоге/usr/hdp/current/kafka-broker/config/
. Скачайте файл из проекта toketi-kafka-connect-iothub, используя следующую команду:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
Чтобы изменить файл
connect-iot-source.properties
и добавить информацию о Центре Интернета вещей, используйте следующую команду:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
В редакторе найдите и измените следующие записи:
Текущее значение Изменить Kafka.Topic=PLACEHOLDER
Замените PLACEHOLDER
наiotin
. Сообщения, полученные из Центра Интернета вещей, размещаются в разделеiotin
.IotHub.EventHubCompatibleName=PLACEHOLDER
замените PLACEHOLDER
именем, совместимым с Центрами событий.IotHub.EventHubCompatibleEndpoint=PLACEHOLDER
замените PLACEHOLDER
конечной точкой, совместимой с Центрами событий.IotHub.AccessKeyName=PLACEHOLDER
Замените PLACEHOLDER
наservice
.IotHub.AccessKeyValue=PLACEHOLDER
замените PLACEHOLDER
первичным ключом политикиservice
.IotHub.Partitions=PLACEHOLDER
замените PLACEHOLDER
на количество секций из предыдущего шага.IotHub.StartTime=PLACEHOLDER
замените PLACEHOLDER
датой в формате UTC. Это время и дата, когда соединитель начинает проверку на наличие сообщений. Формат даты —yyyy-mm-ddThh:mm:ssZ
.BatchSize=100
Замените 100
на5
. В таком случае соединитель считывает сообщения в Kafka, когда в Центре Интернета вещей появилось пять новых сообщений.Пример конфигурации см. в статье Соединитель источника Kafka Connect для Центра Интернета вещей Azure.
Чтобы сохранить изменения, нажмите клавиши CTRL+X, затем — Y и ВВОД.
Дополнительные сведения о настройке источника соединителя см. по адресу https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md.
Настройка подключения приемника
Чтобы настроить подключение приемника для работы с Центром Интернета вещей, выполните следующие действия из SSH-подключения к граничному узлу:
Создайте копию файла
connect-iothub-sink.properties
в каталоге/usr/hdp/current/kafka-broker/config/
. Скачайте файл из проекта toketi-kafka-connect-iothub, используя следующую команду:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
Чтобы изменить файл
connect-iothub-sink.properties
и добавить информацию о Центре Интернета вещей, используйте следующую команду:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
В редакторе найдите и измените следующие записи:
Текущее значение Изменить topics=PLACEHOLDER
Замените PLACEHOLDER
наiotout
. Сообщение, записанные в разделiotout
, переадресовываются в Центр Интернета вещей.IotHub.ConnectionString=PLACEHOLDER
замените PLACEHOLDER
строкой подключения политикиservice
.Пример конфигурации см. в статье Соединитель приемника Kafka Connect для Центра Интернета вещей Azure.
Чтобы сохранить изменения, нажмите клавиши CTRL+X, затем — Y и ВВОД.
Дополнительные сведения о настройке приемника соединителя см. по адресу https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Запуск соединителя источника
Чтобы запустить соединитель источника, используйте следующую команду из SSH-подключения к пограничному узлу:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
После запуска соединителя отправьте сообщения в Центр Интернета вещей со своих устройств. Когда соединитель считывает сообщения из Центра Интернета вещей и сохраняет их в разделе Kafka, он регистрирует информацию на консоли:
[2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39) [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
Примечание.
При запуске соединителя может появиться несколько предупреждений. Эти предупреждения не вызывают проблем с получением сообщений из Центра Интернета вещей.
Через несколько минут остановите работу соединителя, дважды нажав клавиши CTRL+C. Для остановки соединителя потребуется несколько минут.
Запуск соединителя приемника
Чтобы запустить соединитель приемника в автономном режиме, используйте следующую команду из SSH-подключения к граничному узлу:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
После запуска соединителя отобразится похожая информация:
[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
Примечание.
При запуске соединителя может появиться несколько предупреждений. Эти предупреждения можно игнорировать.
Отправка сообщений
Для отправки сообщений через соединитель, выполните следующие действия:
Запустите второй сеанс SSH-подключения к кластеру Kafka:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
Получите адреса брокеров Kafka для нового SSH-подключения. Замените заполнитель PASSWORD паролем для входа в кластер, а затем введите следующую команду:
export password='PASSWORD' export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
Для отправки сообщений в раздел
iotout
используйте следующую команду:/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
Эта команда не возвращает управление в командную строку Bash. Вместо этого он отправляет ввод с клавиатуры в раздел
iotout
.Чтобы отправить сообщение на устройство, вставьте документ JSON в сеанс SSH для
kafka-console-producer
.Внимание
Для записи
"deviceId"
необходимо задать идентификатор устройства. В следующем примере устройство называетсяmyDeviceId
:{"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
Схема для этого документа JSON более подробно описана в репозитории https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Если вы используете имитированное устройство Raspberry Pi и выполняете его, устройство записывает следующее сообщение.
Receive message: Turn On
Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.
Дополнительные сведения об использовании соединителя приемника см. по адресу https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
Следующие шаги
Из этого документа вы узнали, как использовать API Apache Kafka Connect для запуска соединителя IoT Kafka в HDInsight. Другие материалы, посвященные работе с Kafka, доступны по следующим ссылкам: