Del via


Flervariat avviksregistrering med isolasjonsskog

Denne artikkelen viser hvordan du kan bruke SynapseML på Apache Spark for flervariat avviksgjenkjenning. Multivariate anomaly detection allows for the detection of anomalies among many variables or timeseries, taking into account all the inter-correlations and dependencies between the different variables. I dette scenarioet bruker vi SynapseML til å lære opp en Isolation Forest-modell for flervariat avviksgjenkjenning, og vi bruker deretter den opplærte modellen til å utlede multivariate anomalier i et datasett som inneholder syntetiske målinger fra tre IoT-sensorer.

Hvis du vil lære mer om Isolation Forest-modellen, kan du se den opprinnelige artikkelen av Liu et al..

Forutsetning

  • Legg notatblokken til et lakehouse. På venstre side velger du Legg til for å legge til et eksisterende innsjøhus eller opprette et innsjøhus.

Bibliotekimport

from IPython import get_ipython
from IPython.terminal.interactiveshell import TerminalInteractiveShell
import uuid
import mlflow

from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
from pyspark.ml import Pipeline

from synapse.ml.isolationforest import *

from synapse.ml.explainers import *
%matplotlib inline
from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

from synapse.ml.core.platform import *

if running_on_synapse():
    shell = TerminalInteractiveShell.instance()
    shell.define_macro("foo", """a,b=10,20""")

Inndata

# Table inputs
timestampColumn = "timestamp"  # str: the name of the timestamp column in the table
inputCols = [
    "sensor_1",
    "sensor_2",
    "sensor_3",
]  # list(str): the names of the input variables

# Training Start time, and number of days to use for training:
trainingStartTime = (
    "2022-02-24T06:00:00Z"  # datetime: datetime for when to start the training
)
trainingEndTime = (
    "2022-03-08T23:55:00Z"  # datetime: datetime for when to end the training
)
inferenceStartTime = (
    "2022-03-09T09:30:00Z"  # datetime: datetime for when to start the training
)
inferenceEndTime = (
    "2022-03-20T23:55:00Z"  # datetime: datetime for when to end the training
)

# Isolation Forest parameters
contamination = 0.021
num_estimators = 100
max_samples = 256
max_features = 1.0

Lese data

df = (
    spark.read.format("csv")
    .option("header", "true")
    .load(
        "wasbs://publicwasb@mmlspark.blob.core.windows.net/generated_sample_mvad_data.csv"
    )
)

gi kolonner til aktuelle datatyper

df = (
    df.orderBy(timestampColumn)
    .withColumn("timestamp", F.date_format(timestampColumn, "yyyy-MM-dd'T'HH:mm:ss'Z'"))
    .withColumn("sensor_1", F.col("sensor_1").cast(DoubleType()))
    .withColumn("sensor_2", F.col("sensor_2").cast(DoubleType()))
    .withColumn("sensor_3", F.col("sensor_3").cast(DoubleType()))
    .drop("_c5")
)

display(df)

Forberedelse av opplæringsdata

# filter to data with timestamps within the training window
df_train = df.filter(
    (F.col(timestampColumn) >= trainingStartTime)
    & (F.col(timestampColumn) <= trainingEndTime)
)
display(df_train)

Test dataforberedelse

# filter to data with timestamps within the inference window
df_test = df.filter(
    (F.col(timestampColumn) >= inferenceStartTime)
    & (F.col(timestampColumn) <= inferenceEndTime)
)
display(df_test)

Train Isolation Forest-modell

isolationForest = (
    IsolationForest()
    .setNumEstimators(num_estimators)
    .setBootstrap(False)
    .setMaxSamples(max_samples)
    .setMaxFeatures(max_features)
    .setFeaturesCol("features")
    .setPredictionCol("predictedLabel")
    .setScoreCol("outlierScore")
    .setContamination(contamination)
    .setContaminationError(0.01 * contamination)
    .setRandomSeed(1)
)

Deretter oppretter vi en ML-rørledning for å lære opp Isolation Forest-modellen. Vi demonstrerer også hvordan du oppretter et MLflow-eksperiment og registrerer den opplærte modellen.

Registrering av MLflow-modell er bare nødvendig hvis du får tilgang til den opplærte modellen senere. For å lære opp modellen og utføre inferencing i samme notatblokk, er modellobjektmodellen tilstrekkelig.

va = VectorAssembler(inputCols=inputCols, outputCol="features")
pipeline = Pipeline(stages=[va, isolationForest])
model = pipeline.fit(df_train)

Utfør inferencing

Last inn den opplærte isolasjonsskogmodellen

Utfør inferencing

df_test_pred = model.transform(df_test)
display(df_test_pred)

Forhåndslaget avviksdetektor

Azure AI avviksdetektor

  • Avviksstatus for siste punkt: genererer en modell ved hjelp av foregående punkter og bestemmer om det nyeste punktet er uregelmessig (Scala, Python)
  • Finn avvik: genererer en modell ved hjelp av en hel serie og finner avvik i serien (Scala, Python)