Поделиться через


Использование Apache Kafka в HDInsight с Центром Интернета вещей

Узнайте, как использовать соединитель Apache Kafka Connect Azure IoT Hub для перемещения данных между Apache Kafka в HDInsight и Центром Интернета вещей. В этом документе показано, как запускать соединитель Центра Интернета вещей из граничного узла кластера.

API Kafka Connect позволяет реализовать соединители, которые постоянно извлекают данные в Kafka или отправляет данные из Kafka в другую систему. Apache Kafka Connect Azure IoT Hub — это соединитель, который извлекает данные из Центра Интернета вещей в Apache Kafka. Он также может отправлять данные из Kafka в Центр Интернета вещей.

При извлечении из Центра Интернета вещей используется соединитель источника. При отправке в Центр Интернета вещей используется соединитель приемника. Соединитель Центра Интернета вещей может выступать как соединителем источника, так и соединителем приемника.

На следующей схеме показан поток данных между Центром Интернета вещей и Kafka в HDInsight при использовании соединителя.

Изображение, показывающее поток данных из Центр Интернета вещей в Kafka через соединитель.

Дополнительные сведения о том, как подключить API, см. в статье https://kafka.apache.org/documentation/#connect.

Необходимые компоненты

Компиляция соединителя

  1. Скачайте исходный код соединителя из https://github.com/Azure/toketi-kafka-connect-iothub/ в локальную среду.

  2. В командной строке перейдите в каталог toketi-kafka-connect-iothub-master. Для сборки и упаковки проекта выполните следующую команду:

    sbt assembly
    

    Сборка занимает несколько минут. Эта команда создает файл с именем kafka-connect-iothub-assembly_2.11-0.7.0.jar в каталоге toketi-kafka-connect-iothub-master\target\scala-2.11 проекта.

Установка соединителя

  1. Отправьте JAR-файл на граничный узел кластера Kafka в HDInsight. Измените следующую команду, заменив CLUSTERNAME фактическое имя кластера. Значения по умолчанию для учетной записи пользователя SSH и имени пограничного узла используются для изменения по мере необходимости.

    scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net:
    
  2. Когда файл будет скопирован, подключитесь к граничному узлу с помощью SSH:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  3. Чтобы установить соединитель в каталог libs Kafka, используйте следующую команду:

    sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
    

Для выполнения следующих шагов оставьте SSH-подключение активным.

Настройка Apache Kafka

В SSH-подключении к граничному узлу выполните следующие шаги, чтобы настроить в Kafka выполнение соединителя в автономном режиме:

  1. Настройте переменную пароля. Замените заполнитель PASSWORD паролем для входа в кластер, а затем введите следующую команду:

    export password='PASSWORD'
    
  2. Установите служебную программу jq. jq упрощает обработку документов JSON, возвращаемых запросами Ambari. Введите следующую команду:

    sudo apt -y install jq
    
  3. Получите адреса брокеров Kafka. В вашем кластере может быть много брокеров, но вам нужно всего лишь ссылаться на один или два. Чтобы получить адрес двух узлов брокера, используйте следующую команду:

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    echo $KAFKABROKERS
    

    Скопируйте эти значения для последующего использования. Возвращаемое значение аналогично приведенному ниже тексту.

    <brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.cloudapp.net:9092

  4. Получите адреса узлов Apache Zookeeper. В кластере имеется несколько узлов Zookeeper, но необходимо ссылаться только на один или два. Выполните следующую команду для сохранения этих адресов в переменную KAFKAZKHOSTS.

    export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
    
  5. При запуске соединителя в автономном режиме /usr/hdp/current/kafka-broker/config/connect-standalone.properties файл используется для взаимодействия с брокерами Kafka. Чтобы изменить файл connect-standalone.properties, используйте следующую команду:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
    
  6. Внесите следующие правки:

    Текущее значение Новое значение Комментарий
    bootstrap.servers=localhost:9092 Замените значение localhost:9092 именами узлов брокера из предыдущего шага. Настраивает конфигурацию автономного режима так, чтобы граничный узел нашел брокеры Kafka.
    key.converter=org.apache.kafka.connect.json.JsonConverter key.converter=org.apache.kafka.connect.storage.StringConverter Это изменение можно проверить с помощью отправителя консоли, входящего в состав Kafka. Для различных отправителей и потребителей вам могут понадобиться различные преобразователи. Дополнительные сведения об использовании других значений преобразования см. по адресу https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
    value.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.storage.StringConverter То же, что и задано.
    Н/П consumer.max.poll.records=10 Добавьте это в конец файла. Это изменение предназначено для предотвращения времени ожидания в соединителе приемника, ограничивая его 10 записями за раз. Дополнительные сведения см. в разделе https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
  7. Чтобы сохранить файл, нажмите клавиши CTRL+X, затем — Y и ВВОД.

  8. Чтобы создать разделы для соединителя, используйте следующие команды:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS
    
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
    

    Чтобы проверить, что разделы iotin и iotout существуют, используйте следующую команду:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
    

    Раздел iotin используется для получения сообщений из Центра Интернета вещей. Раздел iotout используется для отправки сообщений в Центр Интернета вещей.

Получение сведений о подключении Центра Интернета вещей

Чтобы извлечь информацию о Центре Интернета вещей, используемую соединителем, выполните следующие действия:

  1. Получите конечную точку, совместимую с центрами событий, и ее имя для Центра Интернета вещей. Чтобы получить эту информацию, используйте один из указанных ниже способов:

    • На портале Azure выполните указанные ниже действия.

      1. Перейдите к Центру Интернета вещей и выберите Конечные точки.

      2. В разделе Встроенные конечные точки выберите События.

      3. В разделе Свойства скопируйте значения следующих полей:

        • Имя, совместимое с центрами событий
        • Конечная точка, совместимая с центрами событий
        • Секции

        Внимание

        Значение конечной точки на портале может содержать лишний текст, который не требуется в этом примере. Извлеките текст, который соответствует этому шаблону: sb://<randomnamespace>.servicebus.windows.net/.

    • В интерфейсе командной строки Azure введите следующую команду:

      az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
      

      Замените переменную myhubname именем Центра Интернета вещей. В ответ вы получите примерно такой текст:

      "EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.windows.net/",
      "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e",
      "Partitions": 2
      
  2. Получите политику общего доступа и ключ. Для этого примера используйте служебный ключ. Чтобы получить эту информацию, используйте один из указанных ниже способов:

    • На портале Azure выполните указанные ниже действия.

      1. Выберите Политика общего доступа, а затем выберите службу.
      2. Скопируйте значение первичного ключа.
      3. Скопируйте значение поля Строка подключения — первичный ключ.
    • В интерфейсе командной строки Azure введите следующую команду:

      1. Чтобы получить значение первичного ключа, используйте следующую команду:

        az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
        

        Замените переменную myhubname именем Центра Интернета вещей. Ответ — это первичный ключ для политики service этого центра.

      2. Чтобы получить строку подключения для политики service, используйте следующую команду:

        az iot hub connection-string show --name myhubname --policy-name service --query "connectionString"
        

        Замените переменную myhubname именем Центра Интернета вещей. Ответом является строка подключения для политики service.

Настройка подключения к источнику

Чтобы настроить источник для работы с Центром Интернета вещей, выполните следующие действия из SSH-подключения к граничному узлу:

  1. Создайте копию файла connect-iot-source.properties в каталоге /usr/hdp/current/kafka-broker/config/. Скачайте файл из проекта toketi-kafka-connect-iothub, используя следующую команду:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
    
  2. Чтобы изменить файл connect-iot-source.properties и добавить информацию о Центре Интернета вещей, используйте следующую команду:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    
  3. В редакторе найдите и измените следующие записи:

    Текущее значение Изменить
    Kafka.Topic=PLACEHOLDER Замените PLACEHOLDER на iotin. Сообщения, полученные из Центра Интернета вещей, размещаются в разделе iotin.
    IotHub.EventHubCompatibleName=PLACEHOLDER замените PLACEHOLDER именем, совместимым с Центрами событий.
    IotHub.EventHubCompatibleEndpoint=PLACEHOLDER замените PLACEHOLDER конечной точкой, совместимой с Центрами событий.
    IotHub.AccessKeyName=PLACEHOLDER Замените PLACEHOLDER на service.
    IotHub.AccessKeyValue=PLACEHOLDER замените PLACEHOLDER первичным ключом политики service.
    IotHub.Partitions=PLACEHOLDER замените PLACEHOLDER на количество секций из предыдущего шага.
    IotHub.StartTime=PLACEHOLDER замените PLACEHOLDER датой в формате UTC. Это время и дата, когда соединитель начинает проверку на наличие сообщений. Формат даты — yyyy-mm-ddThh:mm:ssZ.
    BatchSize=100 Замените 100 на 5. В таком случае соединитель считывает сообщения в Kafka, когда в Центре Интернета вещей появилось пять новых сообщений.

    Пример конфигурации см. в статье Соединитель источника Kafka Connect для Центра Интернета вещей Azure.

  4. Чтобы сохранить изменения, нажмите клавиши CTRL+X, затем — Y и ВВОД.

Дополнительные сведения о настройке источника соединителя см. по адресу https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md.

Настройка подключения приемника

Чтобы настроить подключение приемника для работы с Центром Интернета вещей, выполните следующие действия из SSH-подключения к граничному узлу:

  1. Создайте копию файла connect-iothub-sink.properties в каталоге /usr/hdp/current/kafka-broker/config/. Скачайте файл из проекта toketi-kafka-connect-iothub, используя следующую команду:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
    
  2. Чтобы изменить файл connect-iothub-sink.properties и добавить информацию о Центре Интернета вещей, используйте следующую команду:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
    
  3. В редакторе найдите и измените следующие записи:

    Текущее значение Изменить
    topics=PLACEHOLDER Замените PLACEHOLDER на iotout. Сообщение, записанные в раздел iotout, переадресовываются в Центр Интернета вещей.
    IotHub.ConnectionString=PLACEHOLDER замените PLACEHOLDER строкой подключения политики service.

    Пример конфигурации см. в статье Соединитель приемника Kafka Connect для Центра Интернета вещей Azure.

  4. Чтобы сохранить изменения, нажмите клавиши CTRL+X, затем — Y и ВВОД.

Дополнительные сведения о настройке приемника соединителя см. по адресу https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

Запуск соединителя источника

  1. Чтобы запустить соединитель источника, используйте следующую команду из SSH-подключения к пограничному узлу:

    /usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    

    После запуска соединителя отправьте сообщения в Центр Интернета вещей со своих устройств. Когда соединитель считывает сообщения из Центра Интернета вещей и сохраняет их в разделе Kafka, он регистрирует информацию на консоли:

    [2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39)
    [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
    

    Примечание.

    При запуске соединителя может появиться несколько предупреждений. Эти предупреждения не вызывают проблем с получением сообщений из Центра Интернета вещей.

  2. Через несколько минут остановите работу соединителя, дважды нажав клавиши CTRL+C. Для остановки соединителя потребуется несколько минут.

Запуск соединителя приемника

Чтобы запустить соединитель приемника в автономном режиме, используйте следующую команду из SSH-подключения к граничному узлу:

/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties

После запуска соединителя отобразится похожая информация:

[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

Примечание.

При запуске соединителя может появиться несколько предупреждений. Эти предупреждения можно игнорировать.

Отправка сообщений

Для отправки сообщений через соединитель, выполните следующие действия:

  1. Запустите второй сеанс SSH-подключения к кластеру Kafka:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Получите адреса брокеров Kafka для нового SSH-подключения. Замените заполнитель PASSWORD паролем для входа в кластер, а затем введите следующую команду:

    export password='PASSWORD'
    
    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    
  3. Для отправки сообщений в раздел iotout используйте следующую команду:

    /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
    

    Эта команда не возвращает управление в командную строку Bash. Вместо этого он отправляет ввод с клавиатуры в раздел iotout.

  4. Чтобы отправить сообщение на устройство, вставьте документ JSON в сеанс SSH для kafka-console-producer.

    Внимание

    Для записи "deviceId" необходимо задать идентификатор устройства. В следующем примере устройство называется myDeviceId:

    {"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
    

    Схема для этого документа JSON более подробно описана в репозитории https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

Если вы используете имитированное устройство Raspberry Pi и выполняете его, устройство записывает следующее сообщение.

Receive message: Turn On


Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.

Дополнительные сведения об использовании соединителя приемника см. по адресу https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

Следующие шаги

Из этого документа вы узнали, как использовать API Apache Kafka Connect для запуска соединителя IoT Kafka в HDInsight. Другие материалы, посвященные работе с Kafka, доступны по следующим ссылкам: