Condividi tramite


Rilevamento anomalie multivariate con foresta di isolamento

Questo articolo illustra come usare SynapseML in Apache Spark per il rilevamento anomalie multivariato. Il rilevamento delle anomalie multivariate consente il rilevamento di anomalie tra molte variabili o timeseries, tenendo conto di tutte le correlazioni e le dipendenze tra le diverse variabili. In questo scenario si usa SynapseML per eseguire il training di un modello di foresta di isolamento per il rilevamento anomalie multivariato e quindi viene usato per il modello sottoposto a training per dedurre anomalie multivariate all'interno di un set di dati contenente misurazioni sintetiche da tre sensori IoT.

Per maggiori informazioni sul modello di foresta di isolamento, vedere il documento originale di Liu et al..

Prerequisiti

  • Collegare il notebook a un lakehouse. Sul lato sinistro, selezionare Aggiungi per aggiungere un lakehouse esistente o creare un lakehouse.

Importazioni di raccolte

from IPython import get_ipython
from IPython.terminal.interactiveshell import TerminalInteractiveShell
import uuid
import mlflow

from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
from pyspark.ml import Pipeline

from synapse.ml.isolationforest import *

from synapse.ml.explainers import *
%matplotlib inline
from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

from synapse.ml.core.platform import *

if running_on_synapse():
    shell = TerminalInteractiveShell.instance()
    shell.define_macro("foo", """a,b=10,20""")

Dati di input

# Table inputs
timestampColumn = "timestamp"  # str: the name of the timestamp column in the table
inputCols = [
    "sensor_1",
    "sensor_2",
    "sensor_3",
]  # list(str): the names of the input variables

# Training Start time, and number of days to use for training:
trainingStartTime = (
    "2022-02-24T06:00:00Z"  # datetime: datetime for when to start the training
)
trainingEndTime = (
    "2022-03-08T23:55:00Z"  # datetime: datetime for when to end the training
)
inferenceStartTime = (
    "2022-03-09T09:30:00Z"  # datetime: datetime for when to start the training
)
inferenceEndTime = (
    "2022-03-20T23:55:00Z"  # datetime: datetime for when to end the training
)

# Isolation Forest parameters
contamination = 0.021
num_estimators = 100
max_samples = 256
max_features = 1.0

Leggere i dati

df = (
    spark.read.format("csv")
    .option("header", "true")
    .load(
        "wasbs://publicwasb@mmlspark.blob.core.windows.net/generated_sample_mvad_data.csv"
    )
)

convertire le colonne in tipi di dati appropriati

df = (
    df.orderBy(timestampColumn)
    .withColumn("timestamp", F.date_format(timestampColumn, "yyyy-MM-dd'T'HH:mm:ss'Z'"))
    .withColumn("sensor_1", F.col("sensor_1").cast(DoubleType()))
    .withColumn("sensor_2", F.col("sensor_2").cast(DoubleType()))
    .withColumn("sensor_3", F.col("sensor_3").cast(DoubleType()))
    .drop("_c5")
)

display(df)

Preparazione dei dati di training

# filter to data with timestamps within the training window
df_train = df.filter(
    (F.col(timestampColumn) >= trainingStartTime)
    & (F.col(timestampColumn) <= trainingEndTime)
)
display(df_train)

Preparazione dei dati di test

# filter to data with timestamps within the inference window
df_test = df.filter(
    (F.col(timestampColumn) >= inferenceStartTime)
    & (F.col(timestampColumn) <= inferenceEndTime)
)
display(df_test)

Eseguire il training del modello foresta di isolamento

isolationForest = (
    IsolationForest()
    .setNumEstimators(num_estimators)
    .setBootstrap(False)
    .setMaxSamples(max_samples)
    .setMaxFeatures(max_features)
    .setFeaturesCol("features")
    .setPredictionCol("predictedLabel")
    .setScoreCol("outlierScore")
    .setContamination(contamination)
    .setContaminationError(0.01 * contamination)
    .setRandomSeed(1)
)

Verrà quindi creata una pipeline di apprendimento automatico per eseguire il training del modello di foresta di isolamento. Viene inoltre illustrato come creare un esperimento MLflow e registrare il modello sottoposto a training.

La registrazione del modello MLflow è strettamente necessaria solo se si accede al modello sottoposto a training in un secondo momento. Per il training del modello e l'esecuzione dell'inferenza nello stesso notebook, il modello a oggetti del modello è sufficiente.

va = VectorAssembler(inputCols=inputCols, outputCol="features")
pipeline = Pipeline(stages=[va, isolationForest])
model = pipeline.fit(df_train)

Eseguire l'inferenza

Caricare il modello di foresta di isolamento sottoposto a training

Eseguire l'inferenza

df_test_pred = model.transform(df_test)
display(df_test_pred)

Rilevamento anomalie pre-fatti

Rilevamento anomalie di Azure AI

  • Stato anomalie del punto più recente: genera un modello usando i punti precedenti e determina se il punto più recente è anomalo (Scala, Python)
  • Trovare anomalie: genera un modello usando un'intera serie e trova anomalie nella serie (Scala, Python)