Настройка потоков данных в операциях Интернета вещей Azure
Внимание
На этой странице содержатся инструкции по управлению компонентами Операций Интернета вещей Azure с помощью манифестов развертывания Kubernetes, которые доступны в предварительной версии. Эта функция предоставляется с несколькими ограничениями и не должна использоваться для рабочих нагрузок.
Юридические условия, применимые к функциям Azure, которые находятся в состоянии бета-версии, предварительной версии или иным образом еще не выпущены в общедоступной версии, см. на странице Дополнительные условия использования предварительных версий в Microsoft Azure.
Поток данных — это путь, который данные принимают из источника в место назначения с необязательными преобразованиями. Поток данных можно настроить, создав пользовательский ресурс потока данных или используя портал Azure IoT Operations Studio. Поток данных состоит из трех частей: источника, преобразования и назначения.
Чтобы определить источник и назначение, необходимо настроить конечные точки потока данных. Преобразование является необязательным и может включать такие операции, как обогащение данных, фильтрация данных и сопоставление данных с другим полем.
Внимание
Каждый поток данных должен иметь локальную конечную точку брокера MQTT Azure IoT по умолчанию в качестве источника или назначения.
Вы можете использовать интерфейс операций в операциях Интернета вещей Azure для создания потока данных. Интерфейс операций предоставляет визуальный интерфейс для настройки потока данных. Вы также можете использовать Bicep для создания потока данных с помощью файла шаблона Bicep или использовать Kubernetes для создания потока данных с помощью YAML-файла.
Продолжайте чтение, чтобы узнать, как настроить источник, преобразование и назначение.
Необходимые компоненты
Вы можете развернуть потоки данных сразу после того, как у вас есть экземпляр операций Интернета вещей Azure, используя профиль потока данных по умолчанию и конечную точку. Однако может потребоваться настроить профили потоков данных и конечные точки для настройки потока данных.
Профиль потока данных
Если для потоков данных не нужны разные параметры масштабирования, используйте профиль потока данных по умолчанию, предоставляемый операциями Интернета вещей Azure. Сведения о настройке профиля потока данных см. в разделе "Настройка профилей потоков данных".
Конечные точки потока данных
Конечные точки потока данных необходимы для настройки источника и назначения для потока данных. Чтобы быстро приступить к работе, можно использовать конечную точку потока данных по умолчанию для локального брокера MQTT. Вы также можете создавать другие типы конечных точек потока данных, таких как Kafka, Центры событий или Azure Data Lake Storage. Сведения о настройке каждого типа конечной точки потока данных см. в разделе "Настройка конечных точек потока данных".
Начало работы
После получения необходимых компонентов можно приступить к созданию потока данных.
Чтобы создать поток данных в операциях, выберите "Создать поток> данных". Затем вы увидите страницу, на которой можно настроить источник, преобразование и назначение для потока данных.
Ознакомьтесь со следующими разделами, чтобы узнать, как настроить типы операций потока данных.
Исходный код
Чтобы настроить источник для потока данных, укажите ссылку на конечную точку и список источников данных для конечной точки. Выберите один из следующих параметров в качестве источника для потока данных.
Если конечная точка по умолчанию не используется в качестве источника, она должна использоваться в качестве назначения. Дополнительные сведения см. в статье "Потоки данных" должны использовать локальную конечную точку брокера MQTT.
Вариант 1. Использование конечной точки брокера сообщений по умолчанию в качестве источника
В разделе "Исходные сведения" выберите брокер сообщений.
Введите следующие параметры для источника брокера сообщений:
Параметр Description Конечная точка потока данных Выберите по умолчанию конечную точку брокера сообщений MQTT по умолчанию . Раздел Фильтр раздела для подписки на входящие сообщения. См. статью "Настройка MQTT" или "Kafka". Схема сообщений Схема, используемая для десериализации входящих сообщений. См. раздел "Указание схемы для десериализации данных". Выберите Применить.
Вариант 2. Использование ресурса в качестве источника
Ресурс можно использовать в качестве источника потока данных. Использование ресурса в качестве источника доступно только в интерфейсе операций.
В разделе "Исходные сведения" выберите "Ресурс".
Выберите ресурс, который вы хотите использовать в качестве исходной конечной точки.
Выберите Продолжить.
Отображается список точек данных для выбранного ресурса.
Выберите "Применить" , чтобы использовать ресурс в качестве исходной конечной точки.
При использовании ресурса в качестве источника определение ресурса используется для вывода схемы потока данных. Определение ресурса включает схему для точек данных ресурса. Дополнительные сведения см. в статье "Удаленное управление конфигурациями ресурсов".
После настройки данные из ресурса достигают потока данных через локальный брокер MQTT. Таким образом, при использовании ресурса в качестве источника поток данных использует локальную конечную точку брокера MQTT по умолчанию в качестве источника в действительности.
Вариант 3. Использование пользовательской конечной точки потока данных MQTT или Kafka в качестве источника
Если вы создали пользовательскую конечную точку потока данных MQTT или Kafka (например, для использования с сеткой событий или Центрами событий), ее можно использовать в качестве источника для потока данных. Помните, что конечные точки типа хранилища, такие как Data Lake или Fabric OneLake, нельзя использовать в качестве источника.
В разделе "Исходные сведения" выберите брокер сообщений.
Введите следующие параметры для источника брокера сообщений:
Параметр Description Конечная точка потока данных Нажмите кнопку повторного выбора , чтобы выбрать пользовательскую конечную точку потока данных MQTT или Kafka. Дополнительные сведения см. в разделе "Настройка конечных точек потока данных MQTT" или "Настройка конечных точек потока данных" Центры событий Azure и Kafka. Раздел Фильтр раздела для подписки на входящие сообщения. См. статью "Настройка MQTT" или "Kafka". Схема сообщений Схема, используемая для десериализации входящих сообщений. См. раздел "Указание схемы для десериализации данных". Выберите Применить.
Настройка источников данных (разделы MQTT или Kafka)
Можно указать несколько разделов MQTT или Kafka в источнике, не изменив конфигурацию конечной точки потока данных. Такая гибкость означает, что одна и та же конечная точка может использоваться повторно в нескольких потоках данных, даже если разделы различаются. Дополнительные сведения см. в разделе "Повторное использование конечных точек потока данных".
Темы MQTT
Если источником является конечная точка MQTT (включенная сетка событий), можно использовать фильтр раздела MQTT для подписки на входящие сообщения. Фильтр разделов может включать подстановочные знаки для подписки на несколько разделов. Например, thermostats/+/telemetry/temperature/#
подписывается на все сообщения телеметрии температуры из термостатов. Чтобы настроить фильтры раздела MQTT, выполните следующие действия.
В сведениях о источнике потока данных для операций выберите брокер сообщений, а затем используйте поле "Раздел", чтобы указать фильтр раздела MQTT для подписки на входящие сообщения.
Примечание.
В операциях можно указать только один фильтр разделов. Чтобы использовать несколько фильтров разделов, используйте Bicep или Kubernetes.
Общие подписки
Чтобы использовать общие подписки с источниками брокера сообщений, можно указать раздел общей подписки в виде $shared/<GROUP_NAME>/<TOPIC_FILTER>
.
В операциях с сведениями о источнике потока данных выберите брокер сообщений и используйте поле "Раздел", чтобы указать общую группу подписок и раздел.
Если число экземпляров в профиле потока данных больше одного, общая подписка автоматически включается для всех потоков данных, использующих источник брокера сообщений. В этом случае $shared
добавляется префикс и имя общей группы подписок автоматически создается. Например, если у вас есть профиль потока данных с числом экземпляров 3, а поток данных использует конечную точку брокера сообщений в качестве источника, настроенного с topic1
разделами, и topic2
они автоматически преобразуются в общие подписки как $shared/<GENERATED_GROUP_NAME>/topic1
и $shared/<GENERATED_GROUP_NAME>/topic2
.
Вы можете явно создать раздел с именем $shared/mygroup/topic
в конфигурации. Однако явное добавление $shared
раздела не рекомендуется, так как $shared
префикс автоматически добавляется при необходимости. Потоки данных могут выполнять оптимизацию с именем группы, если она не задана. Например, не задано, $share
а потоки данных работают только над именем раздела.
Внимание
Потоки данных, требующие общей подписки, когда число экземпляров больше одного важно при использовании брокера MQTT Сетки событий в качестве источника, так как он не поддерживает общие подписки. Чтобы избежать отсутствия сообщений, задайте для экземпляра профиля потока данных значение одного при использовании брокера MQTT Сетки событий в качестве источника. Это происходит, когда поток данных является подписчиком и получает сообщения из облака.
Темы Kafka
Если источником является конечная точка Kafka (включенные Центры событий), укажите отдельные разделы Kafka, которые нужно подписаться на входящие сообщения. Подстановочные знаки не поддерживаются, поэтому необходимо указать каждый раздел статически.
Примечание.
При использовании Центров событий через конечную точку Kafka каждый отдельный концентратор событий в пространстве имен — это раздел Kafka. Например, если у вас есть пространство имен Центров событий с двумя концентраторами событий и thermostats
humidifiers
можно указать каждый концентратор событий в качестве раздела Kafka.
Чтобы настроить разделы Kafka, выполните следующие действия.
В сведениях о источнике потока данных операций выберите брокер сообщений, а затем используйте поле "Раздел", чтобы указать фильтр раздела Kafka для подписки на входящие сообщения.
Примечание.
В операциях можно указать только один фильтр разделов. Чтобы использовать несколько фильтров разделов, используйте Bicep или Kubernetes.
Указание исходной схемы
При использовании MQTT или Kafka в качестве источника можно указать схему для отображения списка точек данных на портале операций. Использование схемы для десериализации и проверки входящих сообщений в настоящее время не поддерживается.
Если источник является ресурсом, схема автоматически выводится из определения ресурса.
Совет
Чтобы создать схему из примера файла данных, используйте вспомогательный элемент schema Gen.
Чтобы настроить схему, используемую для десериализации входящих сообщений из источника:
В операциях с сведениями о источнике потока данных выберите брокер сообщений и используйте поле схемы сообщения, чтобы указать схему. Для отправки файла схемы можно использовать кнопку "Отправить ". Дополнительные сведения см. в статье "Общие сведения о схемах сообщений".
Дополнительные сведения см. в статье "Общие сведения о схемах сообщений".
Преобразование
Операция преобразования заключается в том, что перед отправкой данных в место назначения можно преобразовать данные из источника. Преобразования являются необязательными. Если вам не нужно вносить изменения в данные, не включайте операцию преобразования в конфигурацию потока данных. Несколько преобразований объединяются на этапах независимо от порядка, в котором они указаны в конфигурации. Порядок этапов всегда:
- Обогащение. Добавьте дополнительные данные в исходные данные, заданные набором данных и условием для сопоставления.
- Фильтр. Фильтрация данных на основе условия.
- Сопоставление, вычисление, переименование или добавление нового свойства: перемещение данных из одного поля в другое с необязательным преобразованием.
В этом разделе приведены общие сведения о преобразованиях потока данных. Дополнительные сведения см. в разделе "Сопоставление данных с помощью потоков данных", "Преобразование данных с помощью преобразований потоков данных" и "Обогащение данных с помощью потоков данных".
В интерфейсе операций выберите "Добавить преобразование потока>данных" (необязательно).
Обогащение: добавление ссылочных данных
Чтобы обогатить данные, сначала добавьте ссылочный набор данных в хранилище состояний операций Интернета вещей Azure. Набор данных используется для добавления дополнительных данных в исходные данные на основе условия. Условие указывается в качестве поля в исходных данных, которые соответствуют полю в наборе данных.
Образец данных можно загрузить в хранилище состояний с помощью интерфейса командной строки хранилища состояний. Имена ключей в хранилище состояний соответствуют набору данных в конфигурации потока данных.
В настоящее время этап обогащения не поддерживается в интерфейсе операций.
Если набор данных содержит запись с asset
полем, аналогично:
{
"asset": "thermostat1",
"location": "room1",
"manufacturer": "Contoso"
}
Данные из источника с deviceId
сопоставлением thermostat1
полей имеют location
manufacturer
поля, доступные на этапах фильтрации и сопоставления.
Дополнительные сведения о синтаксисе условий см. в разделе "Обогащение данных с помощью потоков данных" и "Преобразование данных с помощью потоков данных".
Фильтр: фильтрация данных на основе условия
Чтобы отфильтровать данные по условию filter
, можно использовать этап. Условие указывается в качестве поля в исходных данных, которые соответствуют значению.
В разделе "Преобразование( необязательно)" выберите "Добавить фильтр>".
Введите необходимые параметры.
Параметр Description Условие фильтра Условие фильтрации данных на основе поля в исходных данных. Description Укажите описание условия фильтра. В поле условия фильтра введите
@
или нажмите клавиши CTRL+ПРОБЕЛ , чтобы выбрать точки данных из раскрывающегося списка.Вы можете ввести свойства метаданных MQTT с помощью формата
@$metadata.user_properties.<property>
или@$metadata.topic
. Вы также можете ввести заголовки $metadata с помощью формата@$metadata.<header>
. Синтаксис$metadata
необходим только для свойств MQTT, входящих в заголовок сообщения. Дополнительные сведения см. в ссылках на поля.Условие может использовать поля в исходных данных. Например, можно использовать условие фильтра, например
@temperature > 20
фильтрацию данных меньше или равно 20 на основе поля температуры.Выберите Применить.
Карта: перемещение данных из одного поля в другое
Чтобы сопоставить данные с другим полем с необязательным преобразованием map
, можно использовать операцию. Преобразование указывается в виде формулы, которая использует поля в исходных данных.
В интерфейсе операций сопоставление в настоящее время поддерживается с помощью преобразований вычислений, переименования и создания свойств .
Службы вычислений
Преобразование вычислений можно использовать для применения формулы к исходным данным. Эта операция используется для применения формулы к исходным данным и хранения поля результатов.
В разделе "Преобразование( необязательно)" выберите "Добавить вычисления>".
Введите необходимые параметры.
Параметр Description Выбор формулы Выберите существующую формулу из раскрывающегося списка или выберите "Пользователь" , чтобы ввести формулу вручную. Выходные данные Укажите отображаемое имя выходных данных для результата. Формула Введите формулу, применяемую к исходным данным. Description Укажите описание преобразования. Последнее известное значение При необходимости используйте последнее известное значение, если текущее значение недоступно. В поле "Формула" можно ввести или изменить формулу. Формула может использовать поля в исходных данных. Введите
@
или нажмите клавиши CTRL +ПРОБЕЛ , чтобы выбрать точки данных из раскрывающегося списка.Вы можете ввести свойства метаданных MQTT с помощью формата
@$metadata.user_properties.<property>
или@$metadata.topic
. Вы также можете ввести заголовки $metadata с помощью формата@$metadata.<header>
. Синтаксис$metadata
необходим только для свойств MQTT, входящих в заголовок сообщения. Дополнительные сведения см. в ссылках на поля.Формула может использовать поля в исходных данных. Например, можно использовать
temperature
поле в исходных данных для преобразования температуры в Цельсию и хранения его вtemperatureCelsius
поле вывода.Выберите Применить.
Переименовать
Можно переименовать точку данных с помощью преобразования "Переименовать ". Эта операция используется для переименования точки данных в исходных данных в новое имя. Новое имя можно использовать на последующих этапах потока данных.
В разделе "Преобразование( необязательно)" выберите "Переименовать>добавить".
Введите необходимые параметры.
Параметр Description Точка данных Выберите точку данных из раскрывающегося списка или введите заголовок $metadata. Новое имя точки данных Введите новое имя точки данных. Description Укажите описание преобразования. Введите
@
или нажмите клавиши CTRL +ПРОБЕЛ , чтобы выбрать точки данных из раскрывающегося списка.Вы можете ввести свойства метаданных MQTT с помощью формата
@$metadata.user_properties.<property>
или@$metadata.topic
. Вы также можете ввести заголовки $metadata с помощью формата@$metadata.<header>
. Синтаксис$metadata
необходим только для свойств MQTT, входящих в заголовок сообщения. Дополнительные сведения см. в ссылках на поля.Выберите Применить.
Новое свойство
Вы можете добавить новое свойство в исходные данные с помощью преобразования нового свойства . Эта операция используется для добавления нового свойства в исходные данные. Новое свойство можно использовать на последующих этапах потока данных.
В разделе "Преобразование" (необязательно) выберите "Добавить новое свойство>".
Введите необходимые параметры.
Параметр Description Ключ свойства Введите ключ для нового свойства. Значение свойства Введите значение нового свойства. Description Укажите описание нового свойства. Выберите Применить.
Дополнительные сведения см. в статье "Сопоставление данных с помощью потоков данных" и "Преобразование данных с помощью потоков данных".
Сериализация данных в соответствии со схемой
Если необходимо сериализовать данные перед отправкой в место назначения, необходимо указать формат схемы и сериализации. В противном случае данные сериализуются в ФОРМАТЕ JSON с выводом типов. Конечные точки хранилища, такие как Microsoft Fabric или Azure Data Lake, требуют схемы для обеспечения согласованности данных. Поддерживаемые форматы сериализации — Parquet и Delta.
Совет
Чтобы создать схему из примера файла данных, используйте вспомогательный элемент schema Gen.
Для операций укажите формат схемы и сериализации в сведениях о конечной точке потока данных. Конечные точки, поддерживающие форматы сериализации, — Microsoft Fabric OneLake, Azure Data Lake Storage 2-го поколения, Azure Data Explorer и локальное хранилище. Например, чтобы сериализовать данные в разностном формате, необходимо отправить схему в реестр схем и ссылаться на нее в конфигурации конечной точки назначения потока данных.
Дополнительные сведения о реестре схем см. в разделе "Общие сведения о схемах сообщений".
Назначение
Чтобы настроить назначение для потока данных, укажите ссылку на конечную точку и назначение данных. Список назначений данных для конечной точки можно указать.
Чтобы отправить данные в место назначения, отличное от локального брокера MQTT, создайте конечную точку потока данных. Сведения о настройке конечных точек потока данных см. в статье "Настройка конечных точек потока данных". Если назначение не является локальным брокером MQTT, его необходимо использовать в качестве источника. Дополнительные сведения см. в статье "Потоки данных" должны использовать локальную конечную точку брокера MQTT.
Внимание
Для конечных точек хранилища требуется схема сериализации. Чтобы использовать поток данных с Microsoft Fabric OneLake, Azure Data Lake Storage, Обозреватель данных Azure или локальное хранилище, необходимо указать ссылку на схему.
Выберите конечную точку потока данных, используемую в качестве назначения.
Для конечных точек хранилища требуется схема сериализации. Если выбрать конечную точку назначения Microsoft Fabric OneLake, Azure Data Lake Storage, Azure Data Explorer или конечную точку назначения локального хранилища, необходимо указать ссылку на схему. Например, чтобы сериализовать данные в конечную точку Microsoft Fabric в разностном формате, необходимо отправить схему в реестр схем и ссылаться на нее в конфигурации конечной точки потока данных.
Нажмите кнопку "Продолжить", чтобы настроить назначение.
Введите необходимые параметры для назначения, включая раздел или таблицу для отправки данных. Дополнительные сведения см. в разделе "Настройка назначения данных" (раздел, контейнер или таблица).
Настройка назначения данных (раздел, контейнер или таблица)
Как и источники данных, назначение данных — это концепция, используемая для повторного использования конечных точек потока данных в нескольких потоках данных. По сути, он представляет подкаталог в конфигурации конечной точки потока данных. Например, если конечная точка потока данных является конечной точкой хранения, назначение данных — это таблица в учетной записи хранения. Если конечная точка потока данных является конечной точкой Kafka, назначение данных — это раздел Kafka.
Тип конечной точки | Значение назначения данных | Description |
---|---|---|
MQTT (или сетка событий) | Раздел | Раздел MQTT, в котором отправляются данные. Поддерживаются только статические разделы, без подстановочных знаков. |
Kafka (или Центры событий) | Раздел | Раздел Kafka, в котором отправляются данные. Поддерживаются только статические разделы, без подстановочных знаков. Если конечная точка является пространством имен Центров событий, назначение данных — это отдельный концентратор событий в пространстве имен. |
Azure Data Lake Storage | Контейнер | Контейнер в учетной записи хранения. Не таблица. |
Microsoft Fabric OneLake | Таблица или папка | Соответствует типу настроенного пути для конечной точки. |
Azure Data Explorer | Таблица | Таблица в базе данных Azure Data Explorer. |
Локальное хранилище | Папка | Имя папки или каталога в локальном хранилище постоянного тома. При использовании хранилища контейнеров Azure, включенного томами Azure Arc Cloud Ingest Edge, это должно соответствовать параметру spec.path созданного подволока. |
Чтобы настроить назначение данных, выполните следующие действия.
При использовании интерфейса операций поле назначения данных автоматически интерпретируется на основе типа конечной точки. Например, если конечная точка потока данных является конечной точкой хранилища, страница сведений о назначении предложит ввести имя контейнера. Если конечная точка потока данных является конечной точкой MQTT, страница сведений о назначении предложит ввести раздел и т. д.
Пример
В следующем примере показана конфигурация потока данных, которая использует конечную точку MQTT для источника и назначения. Источник фильтрует данные из раздела azure-iot-operations/data/thermostat
MQTT. Преобразование преобразует температуру в Fahrenheit и фильтрует данные, в которых температура умножается на влажность менее 100000. Назначение отправляет данные в раздел factory
MQTT.
Дополнительные примеры конфигураций потока данных см. в статье AZURE REST API — поток данных и краткое руководство по Bicep.
Проверка работы потока данных
Следуйте инструкциям из руководства. Мост MQTT двунаправленного MQTT для Сетка событий Azure для проверки работы потока данных.
Экспорт конфигурации потока данных
Чтобы экспортировать конфигурацию потока данных, можно использовать операции или экспортировать пользовательский ресурс потока данных.
Выберите поток данных, который вы хотите экспортировать, и выберите " Экспорт " на панели инструментов.
Правильная конфигурация потока данных
Чтобы убедиться, что поток данных работает должным образом, проверьте следующее:
- Конечная точка потока данных MQTT по умолчанию должна использоваться как источник или назначение.
- Профиль потока данных существует и ссылается на конфигурацию потока данных.
- Источник — это конечная точка MQTT, конечная точка Kafka или ресурс. Конечные точки типа хранилища нельзя использовать в качестве источника.
- При использовании Сетки событий в качестве источника число экземпляров профиля потока данных имеет значение 1, так как брокер MQTT сетки событий не поддерживает общие подписки.
- При использовании Центров событий в качестве источника каждый концентратор событий в пространстве имен является отдельным разделом Kafka и должен быть указан в качестве источника данных.
- Преобразование, если используется, настроено с правильным синтаксисом, включая надлежащее экранирование специальных символов.
- При использовании конечных точек типа хранилища в качестве назначения указывается схема.