Обработка правил на основе настраиваемых порогов в Azure Stream Analytics
В этой статье описывается использование эталонных данных для получения решения для оповещения, использующего правила на основе настраиваемых порогов в Azure Stream Analytics.
Сценарий: оповещение на основе правил с настраиваемыми порогами
Вам может потребоваться получать оповещения, когда входящие передаваемые потоком события достигают определенного значения или сводное значение на основе входящих передаваемых потоком событий превышает определенный порог. Несложно настроить запрос Stream Analytics, который сравнивает значение со статическим порогом, который зафиксирован и предопределен. Фиксированный порог может быть жестко запрограммирован в синтаксической конструкции запроса потоковой передачи с помощью простых арифметических операторов сравнения (больше, меньше и равно).
В некоторых случаях значения порогов должны легко настраиваться без необходимости редактировать синтаксические конструкции запросов каждый раз, когда значение порога изменяется. В других случаях может потребоваться обрабатывать одним и тем же запросом множество устройств или пользователей с разными значениями порогов для каждого типа устройства.
Этот шаблон можно использовать для динамической настройки порогов, выбирая тип устройства, к которому применяется порог, и фильтруя входные данные, а также выбирая поля, включаемые в выходные данные.
Рекомендуемый конструктивный шаблон
Используйте входные эталонные данные в задании Stream Analytics при поиске порогов оповещений.
- Храните значения порогов в эталонных данных (одно значение на ключ).
- Присоедините события входных потоковых данных к эталонным данным по ключевому столбцу.
- Используйте значение с ключом из эталонных данных в качестве значения порога.
Пример данных и запроса
В этом примере оповещения генерируются, когда результат статистической обработки данных потоковой передачи с устройств на минутном интервале соответствует значениям в правиле, предоставленном в качестве эталонных данных.
В запросе для каждого deviceId и каждого metricName для deviceId можно настроить от 0 до 5 измерений для операции GROUP BY. Группируются только события с соответствующими значениями фильтра. После группирования вычисляются статистические выражения Min, Max и Avg для 60-секундного "переворачивающегося окна". Затем для создания выходного события оповещения вычисляются фильтры по статистическим значениям в соответствии с настроенным порогом в эталонных данных.
Предположим, что имеется задание Stream Analytics с входными эталонными данными rules и входными потоковыми данными metrics.
Ссылочные данные
В этом примере эталонных данных показано, как можно представить правило на основе порогов. JSON-файл содержит эталонные данные и сохраняется в хранилище BLOB-объектов Azure, и этот контейнер хранилища BLOB-объектов используется в качестве входных эталонных данных rules. Со временем можно будет перезаписать этот JSON-файл и заменить конфигурацию правила, не останавливая или запуская задание потоковой передачи.
- Пример правила представляет настраиваемое оповещение о ситуации, когда загрузка ЦП (среднее значение больше или равно) превышает
90
процентов. При необходимости можно настроить полеvalue
. - Обратите внимание на то, в правиле имеется поле operator, которое позже динамически интерпретируется в синтаксической конструкции запроса на основе
AVGGREATEROREQUAL
. - Правило фильтрует данные по определенному ключу измерения
2
со значениемC1
. Остальные поля являются пустыми строками, то есть входной поток не фильтруется по этим полям событий. При необходимости можно настроить дополнительные правила ЦП для фильтрации по другим полям. - Не все столбцы будут включены в выходное событие оповещения. В этом случае для ключа
includedDim
номер2
задано значениеTRUE
, чтобы выходные события содержали поле номер 2 данных событий из потока. Другие поля не будут включены в выходное оповещение, однако список полей можно изменить.
{
"ruleId": 1234,
"deviceId" : "978648",
"metricName": "CPU",
"alertName": "hot node AVG CPU over 90",
"operator" : "AVGGREATEROREQUAL",
"value": 90,
"includeDim": {
"0": "FALSE",
"1": "FALSE",
"2": "TRUE",
"3": "FALSE",
"4": "FALSE"
},
"filter": {
"0": "",
"1": "",
"2": "C1",
"3": "",
"4": ""
}
}
Пример запроса потоковых данных
Этот пример запроса Stream Analytics объединяет эталонные данные rules из примера выше с входным потоком данных metrics.
WITH transformedInput AS
(
SELECT
dim0 = CASE rules.includeDim.[0] WHEN 'TRUE' THEN metrics.custom.dimensions.[0].value ELSE NULL END,
dim1 = CASE rules.includeDim.[1] WHEN 'TRUE' THEN metrics.custom.dimensions.[1].value ELSE NULL END,
dim2 = CASE rules.includeDim.[2] WHEN 'TRUE' THEN metrics.custom.dimensions.[2].value ELSE NULL END,
dim3 = CASE rules.includeDim.[3] WHEN 'TRUE' THEN metrics.custom.dimensions.[3].value ELSE NULL END,
dim4 = CASE rules.includeDim.[4] WHEN 'TRUE' THEN metrics.custom.dimensions.[4].value ELSE NULL END,
metric = metrics.metric.value,
metricName = metrics.metric.name,
deviceId = rules.deviceId,
ruleId = rules.ruleId,
alertName = rules.alertName,
ruleOperator = rules.operator,
ruleValue = rules.value
FROM
metrics
timestamp by eventTime
JOIN
rules
ON metrics.deviceId = rules.deviceId AND metrics.metric.name = rules.metricName
WHERE
(rules.filter.[0] = '' OR metrics.custom.filters.[0].value = rules.filter.[0]) AND
(rules.filter.[1] = '' OR metrics.custom.filters.[1].value = rules.filter.[1]) AND
(rules.filter.[2] = '' OR metrics.custom.filters.[2].value = rules.filter.[2]) AND
(rules.filter.[3] = '' OR metrics.custom.filters.[3].value = rules.filter.[3]) AND
(rules.filter.[4] = '' OR metrics.custom.filters.[4].value = rules.filter.[4])
)
SELECT
System.Timestamp as time,
transformedInput.deviceId as deviceId,
transformedInput.ruleId as ruleId,
transformedInput.metricName as metric,
transformedInput.alertName as alert,
AVG(metric) as avg,
MIN(metric) as min,
MAX(metric) as max,
dim0, dim1, dim2, dim3, dim4
FROM
transformedInput
GROUP BY
transformedInput.deviceId,
transformedInput.ruleId,
transformedInput.metricName,
transformedInput.alertName,
dim0, dim1, dim2, dim3, dim4,
ruleOperator,
ruleValue,
TumblingWindow(second, 60)
HAVING
(
(ruleOperator = 'AVGGREATEROREQUAL' AND avg(metric) >= ruleValue) OR
(ruleOperator = 'AVGEQUALORLESS' AND avg(metric) <= ruleValue)
)
Пример входных потоковых данных события
Этот пример данных JSON представляет входные данные metrics, используемые в запросе потоковых данных, приведенном выше.
- На минутном временном диапазоне отображены три примера событий со значением
T14:50
. - У всех трех событий
deviceId
имеет значение978648
. - Значения метрики ЦП отличаются в каждом событии:
98
,95
и80
соответственно. Только в первых двух примерах событий превышен порог правила генерации оповещений для ЦП, установленный в правиле. - Поле includeDim в правиле генерации оповещений было ключом номер 2. Соответствующее поле ключа 2 в примере события называется
NodeName
. Три примера событий имеют значенияN024
,N024
иN014
соответственно. В выходных данных отображается только узелN024
, так как это единственные данные, соответствующие условиям оповещения о высокой загрузке ЦП.N014
не превышает порог высокой загрузки ЦП. - В правиле генерации оповещений настроен оператор
filter
только по ключу номер 2, который соответствует полюcluster
в примерах событий. Все три примера событий имеют значениеC1
и соответствуют условиям фильтра.
{
"eventTime": "2018-04-30T14:50:23.1324132Z",
"deviceId": "978648",
"custom": {
"dimensions": {
"0": {
"name": "NodeType",
"value": "N1"
},
"1": {
"name": "Cluster",
"value": "C1"
},
"2": {
"name": "NodeName",
"value": "N024"
}
},
"filters": {
"0": {
"name": "application",
"value": "A1"
},
"1": {
"name": "deviceType",
"value": "T1"
},
"2": {
"name": "cluster",
"value": "C1"
},
"3": {
"name": "nodeType",
"value": "N1"
}
}
},
"metric": {
"name": "CPU",
"value": 98,
"count": 1.0,
"min": 98,
"max": 98,
"stdDev": 0.0
}
}
{
"eventTime": "2018-04-30T14:50:24.1324138Z",
"deviceId": "978648",
"custom": {
"dimensions": {
"0": {
"name": "NodeType",
"value": "N2"
},
"1": {
"name": "Cluster",
"value": "C1"
},
"2": {
"name": "NodeName",
"value": "N024"
}
},
"filters": {
"0": {
"name": "application",
"value": "A1"
},
"1": {
"name": "deviceType",
"value": "T1"
},
"2": {
"name": "cluster",
"value": "C1"
},
"3": {
"name": "nodeType",
"value": "N2"
}
}
},
"metric": {
"name": "CPU",
"value": 95,
"count": 1,
"min": 95,
"max": 95,
"stdDev": 0
}
}
{
"eventTime": "2018-04-30T14:50:37.1324130Z",
"deviceId": "978648",
"custom": {
"dimensions": {
"0": {
"name": "NodeType",
"value": "N3"
},
"1": {
"name": "Cluster",
"value": "C1 "
},
"2": {
"name": "NodeName",
"value": "N014"
}
},
"filters": {
"0": {
"name": "application",
"value": "A1"
},
"1": {
"name": "deviceType",
"value": "T1"
},
"2": {
"name": "cluster",
"value": "C1"
},
"3": {
"name": "nodeType",
"value": "N3"
}
}
},
"metric": {
"name": "CPU",
"value": 80,
"count": 1,
"min": 80,
"max": 80,
"stdDev": 0
}
}
Пример выходных данных
В этом примере выходных данных JSON показано отдельное событие оповещения, которые было сгенерировано на основе правила порога ЦП, определенного в эталонных данных. Выходное событие содержит имя оповещения, а также результаты статистических выражений (average, min и max) для рассматриваемых полей. Данные выходного события содержат значение N024
ключа поля номер 2 NodeName
ввиду конфигурации правила. (Данные JSON были изменены для отображения разрывов строк для удобства чтения.)
{"time":"2018-05-01T02:03:00.0000000Z","deviceid":"978648","ruleid":1234,"metric":"CPU",
"alert":"hot node AVG CPU over 90","avg":96.5,"min":95.0,"max":98.0,
"dim0":null,"dim1":null,"dim2":"N024","dim3":null,"dim4":null}