AnomalyDetection_ChangePoint (Azure Stream Analytics)
Detekuje trvalé anomálie ve streamu událostí časové řady. Základní model strojového učení používá algoritmus Martingales výměny.
Syntaxe
AnomalyDetection_ChangePoint(
<scalar_expression>,
<confidence>,
<historySize>)
OVER ([PARTITION BY <partition key>]
LIMIT DURATION(<unit>, <length>)
[WHEN boolean_expression])
Argumenty
Scalar_expression
Sloupec události nebo vypočítané pole, přes které model provádí detekci anomálií. Povolené hodnoty tohoto parametru zahrnují datové typy FLOAT nebo BIGINT, které vracejí jednu (skalární) hodnotu.
Výraz se zástupným znakem * není povolen. scalar_expression navíc nemůžou obsahovat jiné analytické nebo externí funkce.
Důvěru
Procentuální číslo od 1,00 do 100 (včetně), které nastavuje citlivost modelu strojového učení. Čím nižší je spolehlivost, tím vyšší je počet zjištěných anomálií a naopak. Začněte od libovolného čísla mezi 70 a 90 a upravte ho na základě výsledků pozorovaných při vývoji nebo testování.
velikost historie
Počet událostí v posuvném okně, ze kterého se model průběžně učí, a používá je k vyhodnocení další události z hlediska anomálií. Obvykle by to mělo představovat časové období normálního chování, aby model mohl označit následnou anomálii. Začněte se vzdělaným odhadem s využitím historických protokolů a upravte je na základě výsledků pozorovaných ve vývoji nebo testu.
OVER ([ partition_by_clause ] limit_duration_clause [when_clause])
partition_by_clause
Slouží k rozdělení trénování modelu na základě konkrétního sloupce v událostech. Model použije stejné nastavení parametru funkce napříč všemi oddíly.
limit_duration_clause DURATION(jednotka, délka)
Velikost posuvného okna v rámci Stream Analytics z hlediska času. Doporučená velikost tohoto časového intervalu je ekvivalentem času, který trvá generování historieSize počet událostí v stabilním stavu.
when_clause
Určuje logickou podmínku pro události, které se mají poskytnout modelu za účelem detekce anomálií. When_clause je nepovinný.
Návratové typy
Funkce vrátí vnořený záznam složený z následujících sloupců:
IsAnomaly
BIGINT (0 nebo 1) označující, jestli byla událost neobvyklá nebo ne.
Skóre
Vypočítané Martingale skóre (float) označující, jak neobvyklá událost je. Toto skóre roste exponenciálně s neobvyklými hodnotami.
Příklady
V následující ukázce dotazu první dotaz předpokládá událost každých 5 minut a druhý dotaz předpokládá událost každou sekundu. Úroveň spolehlivosti je pro oba modely nastavená na 75.
AnomalyDetection_ChangePoint(reading, 75, 72)
OVER (LIMIT DURATION(hour, 6))
AnomalyDetection_ChangePoint(temperature, 75, 120)
OVER ([PARTITION BY sensorId] LIMIT DURATION(second, 120))
Příklad za předpokladu jednotné rychlosti vstupu 1 událost za sekundu ve 20minutovém posuvném okně s velikostí historie 1200 událostí. Konečný příkaz SELECT extrahuje a vypíše skóre a stav anomálií s úrovní spolehlivosti 80 %.
WITH AnomalyDetectionStep AS
(
SELECT
EVENTENQUEUEDUTCTIME as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200)
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)
SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly
INTO output
FROM AnomalyDetectionStep
Příklad s neujednotným vstupním proudem, který je jednotný pomocí přeskakujícího okna 1 sekundy:
WITH SmootheningStep AS
(
SELECT
System.Timestamp() as time,
AVG(CAST(temperature as float)) as temp
FROM input
GROUP BY TUMBLINGWINDOW(second, 1)
),
AnomalyDetectionStep AS
(
SELECT
time,
temp,
AnomalyDetection_ChangePoint(temp, 80, 1200)
OVER(LIMIT DURATION(minute, 20)) as ChangePointScores
FROM SmootheningStep
)
SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly
INTO output
FROM AnomalyDetectionStep
Příklad s rozděleným dotazem pro trénování samostatného modelu pro každý senzor:
WITH AnomalyDetectionStep AS
(
SELECT
sensorid,
System.Timestamp() as time,
CAST(temperature as float) as temp,
AnomalyDetection_ChangePoint(CAST(temperature as float), 80, 1200)
OVER(PARTITION BY sensorid
LIMIT DURATION(minute, 20)) as ChangePointScores
FROM input
)
SELECT
CAST (sensorid as nvarchar(max)) as sensoridstring,
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') as float) as
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') as bigint) as
IsChangePointAnomaly
INTO output
FROM AnomalyDetectionStep