AnomalyDetection_ChangePoint (Azure Stream Analytics)
Wykrywa trwałe anomalie w strumieniu zdarzeń szeregów czasowych. Podstawowy model uczenia maszynowego używa algorytmu Exchangeability Martingales.
Składnia
AnomalyDetection_ChangePoint(
<scalar_expression>,
<confidence>,
<historySize>)
OVER ([PARTITION BY <partition key>]
LIMIT DURATION(<unit>, <length>)
[WHEN boolean_expression])
Argumenty
Scalar_expression
Kolumna zdarzenia lub obliczone pole, w którym model wykonuje wykrywanie anomalii. Dozwolone wartości tego parametru obejmują typy danych FLOAT lub BIGINT zwracające pojedynczą wartość (skalarną).
Wyrażenie wieloznaczne * jest niedozwolone. Ponadto scalar_expression nie może zawierać innych funkcji analitycznych ani funkcji zewnętrznych.
Zaufania
Liczba procentowa z zakresu od 1,00 do 100 (włącznie), która określa wrażliwość modelu uczenia maszynowego. Im niższa jest pewność, tym większa liczba wykrytych anomalii i odwrotnie. Zacznij od dowolnej liczby z zakresu od 70 do 90 i dostosuj tę wartość w oparciu o wyniki obserwowane podczas programowania lub testowania.
historySize
Liczba zdarzeń w oknie przesuwanym, z którego model stale uczy się i używa go do oceniania następnego zdarzenia w przypadku anomalii. Zazwyczaj powinno to reprezentować okres normalnego zachowania, aby umożliwić modelowi flagę kolejnej anomalii. Zacznij od wykształconego odgadnięcia przy użyciu dzienników historycznych i dostosuj się na podstawie wyników obserwowanych w programowania lub teście.
OVER ([ partition_by_clause ] limit_duration_clause [when_clause])
partition_by_clause
Służy do partycjonowania trenowania modelu na podstawie określonej kolumny w zdarzeniach. Model stosuje te same ustawienia parametrów funkcji we wszystkich partycjach.
limit_duration_clause DURATION(jednostka, długość)
Rozmiar okna przesuwanego w usłudze Stream Analytics pod względem czasu. Zalecany rozmiar tego przedziału czasu jest odpowiednikiem czasu generowania historiiLiczba zdarzeń w stanie stabilnym.
when_clause
Określa warunek logiczny dla zdarzeń, które mają być udostępniane modelowi w celu przeprowadzenia wykrywania anomalii. When_clause jest opcjonalny.
Typy zwracane
Funkcja zwraca zagnieżdżony rekord składający się z następujących kolumn:
IsAnomaly
BIGINT (0 lub 1) wskazujący, czy zdarzenie było nietypowe, czy nie.
Ocena
Obliczony wynik Martingale (float) wskazujący, jak nietypowe jest zdarzenie. Ten wynik rośnie wykładniczo z nietypowymi wartościami.
Przykłady
W poniższym przykładzie zapytania pierwsze zapytanie zakłada zdarzenie co 5 minut, a drugie zapytanie zakłada zdarzenie co sekundę. Poziom ufności jest ustawiony na 75 dla obu modeli.
AnomalyDetection_ChangePoint(reading, 75, 72)
OVER (LIMIT DURATION(hour, 6))
AnomalyDetection_ChangePoint(temperature, 75, 120)
OVER ([PARTITION BY sensorId] LIMIT DURATION(second, 120))
Przykład przy założeniu, że jednolita szybkość wprowadzania wynosi 1 zdarzenie na sekundę w 20-minutowym oknie przesuwania z rozmiarem historii wynoszącym 1200 zdarzeń. Końcowa instrukcja SELECT wyodrębnia i generuje wynik oraz stan anomalii z poziomem ufności 80%.
WITH AnomalyDetectionStep AS
(
SELECT
EVENTENQUEUEDUTCTIME as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200)
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)
SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly
INTO output
FROM AnomalyDetectionStep
Przykład ze strumieniem wejściowym, który jest jednolity przy użyciu okna wirowania 1 sekundy:
WITH SmootheningStep AS
(
SELECT
System.Timestamp() as time,
AVG(CAST(temperature as float)) as temp
FROM input
GROUP BY TUMBLINGWINDOW(second, 1)
),
AnomalyDetectionStep AS
(
SELECT
time,
temp,
AnomalyDetection_ChangePoint(temp, 80, 1200)
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM SmootheningStep
)
SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly
INTO output
FROM AnomalyDetectionStep
Przykład z partycjonowaną kwerendą w celu wytrenowania oddzielnego modelu na czujnik:
WITH AnomalyDetectionStep AS
(
SELECT
sensorid,
System.Timestamp() as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200)
OVER(PARTITION BY sensorid
LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)
SELECT
CAST (sensorid as nvarchar(max)) as sensoridstring,
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly
INTO output
FROM AnomalyDetectionStep