Интеграция Azure Stream Analytics со службой "Машинное обучение Azure"
Модели машинного обучения можно реализовывать в качестве определяемой пользователем функции (UDF) в заданиях Azure Stream Analytics для оценки и прогнозирования входных данных потоковой передачи в реальном времени. Машинное обучение Azure позволяет использовать любое популярное средство с открытым кодом, например Tensorflow, scikit-learn или PyTorch, для подготовки, обучения и развертывания моделей.
Необходимые компоненты
Перед добавлением модели машинного обучения в качестве функции в задание Stream Analytics выполните следующие действия.
Разверните модель как веб-службу с помощью Машинного обучения Azure.
Конечная точка машинного обучения должна иметь связанный с ней файл swagger, который помогает Stream Analytics выяснить схему входных и выходных данных. Вы можете использовать этот пример определения Swagger в качестве контрольного варианта для правильной настройки.
Убедитесь, что веб-служба принимает и возвращает сериализованные данные JSON.
Для крупномасштабных развертываний в рабочей среде подходит Служба Azure Kubernetes, разверните свою модель в ней. Если веб-служба не может обрабатывать количество запросов, поступающих из задания, производительность задания Stream Analytics будет снижена, что влияет на задержку. Модели, развернутые в Экземплярах контейнеров Azure, поддерживаются только при использовании портала Azure.
Добавление модели машинного обучения к заданию
Вы можете добавить функции Машинного обучения Azure в задание Stream Analytics непосредственно на портале Azure или в Visual Studio Code.
Портал Azure
Перейдите к заданию Stream Analytics в портале Azure и выберите Функции в разделе Топология задания. Затем щелкните Служба Машинного обучения Azure в раскрывающемся меню + Добавить.
Заполните форму Функция службы машинного обучение Azure следующими значениями свойств:
В следующей таблице описаны все свойства функций службы Машинного обучения Azure в Stream Analytics.
Свойство | Description |
---|---|
Псевдоним функции | Введите имя для вызова функции в запросе. |
Отток подписок | Вашу подписку Azure. |
Рабочая область службы "Машинное обучение Azure" | Рабочая область машинного обучения Azure, используемая для развертывания модели как веб-службы. |
Конечная точка | Веб-служба, в которой размещается ваша модель. |
Сигнатура функции | Сигнатура веб-службы выводится из спецификации схемы API. Если не удается загрузить сигнатуру, убедитесь, что вы предоставили пример входных и выходных данных в сценарии оценки для автоматического создания схемы. |
Число параллельных запросов на секцию | Это параметр расширенной конфигурации для оптимизации пропускной способности при большом масштабе. Это число представляет параллельные запросы, отправляемые из каждой секции задания в веб-службу. Задания с шестью единицами потоковой передачи (SU) и ниже имеют одну секцию. Задания с 12 SU имеют две секции, 18 — три секции и т. д. Например, если в задании две секции и этот параметр установлен на 4, то в веб-службе будет содержаться восемь одновременных запросов от задания. |
Максимальное количество пакетов | Это параметр расширенной конфигурации для оптимизации пропускной способности при большом масштабе. Это число представляет максимальное число событий, сгруппированных в одном запросе, отправленном в веб-службу. |
Вызов конечной точки машинного обучения из запроса
Когда запрос Stream Analytics вызывает определяемую пользователем функцию Машинного обучения Azure UDF, задание создает сериализованный запрос JSON к веб-службе. Запрос основан на схеме, зависящей от модели, которую Stream Analytics выводит из ответа конечной точки Swagger.
Предупреждение
Машинное обучение конечные точки не вызываются при тестировании с помощью редактора запросов портал Azure, так как задание не выполняется. Чтобы проверить вызов конечной точки на портале, задание Stream Analytics должно выполняться.
Следующий запрос Stream Analytics является примером вызова определяемой пользователем функции Машинного обучения Azure:
SELECT udf.score(<model-specific-data-structure>)
INTO output
FROM input
WHERE <model-specific-data-structure> is not null
Если отправленные в ML UDF входные данные не согласуются с ожидаемой схемой, конечная точка возвратит ответ с кодом ошибки 400, что приведет к переходу задания Stream Analytics в состояние сбоя. Рекомендуется включить журналы ресурсов для задания, что позволит легко отлаживать и устранять такие проблемы. Поэтому настоятельно рекомендуется:
- Проверка входных данных в UDF машинного обучения не имеет значения NULL
- Проверяйте тип для каждого поля, которое передается во входных данных ML UDF, чтобы он соответствовал ожиданиям конечной точки.
Примечание.
Определяемые пользователем функции UDF вычисляются для каждой строки заданного шага запроса, даже при вызове с помощью условного выражения (т. е. CASE WHEN [A] IS NOT NULL THEN udf.score(A) ELSE '' END
). При необходимости используйте предложение WITH для создания разных путей, вызывая определяемую пользователем функцию ML только при необходимости, прежде чем использовать UNION для повторного объединения путей.
Передача нескольких входных параметров в определяемую пользователем функцию
Наиболее распространенными примерами входных данных для моделей машинного обучения являются DataFrame и массивы NumPy. Массив можно создать с помощью определяемой пользователем функции JavaScript, а сериализованный в формате JSON DataFrame — с помощью предложения WITH
.
Создание входного массива
Вы можете создать определяемую пользователем функцию JavaScript, которая принимает N входов и создает массив, который можно использовать в качестве входных данных для определяемой пользователем функции Машинного обучения Azure.
function createArray(vendorid, weekday, pickuphour, passenger, distance) {
'use strict';
var array = [vendorid, weekday, pickuphour, passenger, distance]
return array;
}
После добавления UDF JavaScript в задание можно вызвать Машинное обучение Azure UDF с помощью следующего запроса:
WITH
ModelInput AS (
#use JavaScript UDF to construct array that will be used as input to ML UDF
SELECT udf.createArray(vendorid, weekday, pickuphour, passenger, distance) as inputArray
FROM input
)
SELECT udf.score(inputArray)
INTO output
FROM ModelInput
#validate inputArray is not null before passing it to ML UDF to prevent job from failing
WHERE inputArray is not null
Следующий JSON является примером запроса:
{
"Inputs": {
"WebServiceInput0": [
["1","Mon","12","1","5.8"],
["2","Wed","10","2","10"]
]
}
}
Создание DataFrame Pandas или PySpark
Можно использовать предложение WITH
для создания сериализованного в формате JSON DataFrame, который может быть передан в качестве входных данных для определяемой пользователем функции Машинного обучения Azure, как показано ниже.
Следующий запрос создает DataFrame, выбирая необходимые поля и используя DataFrame в качестве входных данных для функции Машинного обучения Azure.
WITH
Dataframe AS (
SELECT vendorid, weekday, pickuphour, passenger, distance
FROM input
)
SELECT udf.score(Dataframe)
INTO output
FROM Dataframe
WHERE Dataframe is not null
Следующий JSON является примером запроса из состава предыдущего запроса:
{
"Inputs": {
"WebServiceInput0": [
{
"vendorid": "1",
"weekday": "Mon",
"pickuphour": "12",
"passenger": "1",
"distance": "5.8"
},
{
"vendorid": "2",
"weekday": "Tue",
"pickuphour": "10",
"passenger": "2",
"distance": "10"
}]
}
}
Оптимизация производительности для определяемой пользователем функции Машинного обучения Azure
При развертывании модели в службе Kubernetes Azure можно профилировать модель, чтобы определить использование ресурсов. Кроме того, вы можете включить Application Insights для развертываний, чтобы узнать их частоту запросов, время отклика и частоту сбоев.
При наличии сценария с высокой пропускной способностью событий для достижения оптимальной производительности с низкой задержкой может быть необходимо изменить следующие параметры в Stream Analytics.
- Максимальное количество пакетов
- Число параллельных запросов на секцию.
Определение правильного размера пакета
После развертывания веб-службы вы отправляете пример запроса с различными размерами пакетов начиная с 50 и увеличивая его в порядке сотен. Например, 200, 500, 1000, 2000 и т. д. Вы заметите, что после определенного размера пакета задержка ответа увеличится. Точка, после которой увеличивается задержка ответа, должна быть максимальным числом пакетов для вашего задания.
Определение числа параллельных запросов на секцию.
При оптимальном масштабе задание Stream Analytics должно иметь возможность отправить несколько параллельных запросов в веб-службу и получить ответ в течение нескольких миллисекунд. Задержка ответа веб-службы может напрямую влиять на задержку и производительность задания Stream Analytics. Если вызов из задания в веб-службу занимает много времени, скорее всего, вы увидите увеличение задержки водяного знака, а также может увидеть увеличение числа невыполненных входных событий.
Чтобы снизить задержку, обеспечьте достаточное количество узлов и реплик при подготовке кластера Службы Azure Kubernetes (AKS). Очень важно, чтобы веб-служба была высокодоступна и возвращала успешные ответы. Если задание получает сообщение об ошибке, которое может быть извлечено, например сообщение об недоступности службы (503), оно автоматически повторится с экспоненциальным обратным выключением. Если задание получает в ответе от конечной точки одну из перечисленных ниже ошибок, оно переходит в состояние сбоя.
- Недопустимый запрос (400)
- Конфликт (409)
- Не найдено (404)
- Не авторизовано (401)
Ограничения
Если вы используете управляемую конечную точку Azure ML, Stream Analytics в настоящее время может получить доступ только к конечным точкам с включенным общедоступным сетевым доступом. Дополнительные сведения см. на странице о частных конечных точках Машинного обучения Azure.