Erkennung von multivariaten Anomalien mit Isolation Forest
Dieser Artikel veranschaulicht, wie Sie SynapseML in Apache Spark für die Erkennung von multivariaten Anomalien verwenden können. Die Erkennung von multivariaten Anomalien ermöglicht die Erkennung von Anomalien unter vielen Variablen oder Zeitreihen unter Berücksichtigung aller Interkorrelationen und Abhängigkeiten zwischen den verschiedenen Variablen. In diesem Szenario verwenden wir SynapseML, um ein Isolation Forest-Modell für die Erkennung von multivariaten Anomalien zu trainieren. Anschließend verwenden wir das trainierte Modell, um multivariate Anomalien in einem Dataset abzuleiten, das synthetische Messungen von drei IoT-Sensoren enthält.
Weitere Informationen zum Isolation Forest-Modell finden Sie in der Originalarbeit von Liu et al..
Voraussetzungen
- Fügen Sie Ihr Notebook an ein Lakehouse an. Wählen Sie auf der linken Seite Hinzufügen aus, um ein vorhandenes Lakehouse hinzuzufügen oder ein Lakehouse zu erstellen.
Bibliotheksimporte
from IPython import get_ipython
from IPython.terminal.interactiveshell import TerminalInteractiveShell
import uuid
import mlflow
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from synapse.ml.isolationforest import *
from synapse.ml.explainers import *
%matplotlib inline
from pyspark.sql import SparkSession
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
from synapse.ml.core.platform import *
if running_on_synapse():
shell = TerminalInteractiveShell.instance()
shell.define_macro("foo", """a,b=10,20""")
Eingabedaten
# Table inputs
timestampColumn = "timestamp" # str: the name of the timestamp column in the table
inputCols = [
"sensor_1",
"sensor_2",
"sensor_3",
] # list(str): the names of the input variables
# Training Start time, and number of days to use for training:
trainingStartTime = (
"2022-02-24T06:00:00Z" # datetime: datetime for when to start the training
)
trainingEndTime = (
"2022-03-08T23:55:00Z" # datetime: datetime for when to end the training
)
inferenceStartTime = (
"2022-03-09T09:30:00Z" # datetime: datetime for when to start the training
)
inferenceEndTime = (
"2022-03-20T23:55:00Z" # datetime: datetime for when to end the training
)
# Isolation Forest parameters
contamination = 0.021
num_estimators = 100
max_samples = 256
max_features = 1.0
Lesen von Daten
df = (
spark.read.format("csv")
.option("header", "true")
.load(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/generated_sample_mvad_data.csv"
)
)
Umwandeln von Spalten in geeignete Datentypen
df = (
df.orderBy(timestampColumn)
.withColumn("timestamp", F.date_format(timestampColumn, "yyyy-MM-dd'T'HH:mm:ss'Z'"))
.withColumn("sensor_1", F.col("sensor_1").cast(DoubleType()))
.withColumn("sensor_2", F.col("sensor_2").cast(DoubleType()))
.withColumn("sensor_3", F.col("sensor_3").cast(DoubleType()))
.drop("_c5")
)
display(df)
Vorbereiten der Trainingsdaten
# filter to data with timestamps within the training window
df_train = df.filter(
(F.col(timestampColumn) >= trainingStartTime)
& (F.col(timestampColumn) <= trainingEndTime)
)
display(df_train)
Vorbereiten der Testdaten
# filter to data with timestamps within the inference window
df_test = df.filter(
(F.col(timestampColumn) >= inferenceStartTime)
& (F.col(timestampColumn) <= inferenceEndTime)
)
display(df_test)
Trainieren des Isolation Forest-Modells
isolationForest = (
IsolationForest()
.setNumEstimators(num_estimators)
.setBootstrap(False)
.setMaxSamples(max_samples)
.setMaxFeatures(max_features)
.setFeaturesCol("features")
.setPredictionCol("predictedLabel")
.setScoreCol("outlierScore")
.setContamination(contamination)
.setContaminationError(0.01 * contamination)
.setRandomSeed(1)
)
Als Nächstes erstellen wir eine ML-Pipeline, um das Isolation Forest-Modell zu trainieren. Außerdem wird veranschaulicht, wie Sie ein MLflow-Experiment erstellen und das trainierte Modell registrieren.
Die Registrierung des MLflow-Modells ist nur erforderlich, wenn zu einem späteren Zeitpunkt auf das trainierte Modell zugegriffen wird. Zum Trainieren des Modells und zum Ziehen von Rückschlüssen im selben Notebook ist das Modellobjektmodell ausreichend.
va = VectorAssembler(inputCols=inputCols, outputCol="features")
pipeline = Pipeline(stages=[va, isolationForest])
model = pipeline.fit(df_train)
Ziehen von Rückschlüssen
Laden des trainierten Isolation Forest-Modells
Ziehen von Rückschlüssen
df_test_pred = model.transform(df_test)
display(df_test_pred)
Vorgemachte Anomalieerkennung
- Anomaliestatus des letzten Punkts: Generiert ein Modell anhand vorheriger Punkte und ermittelt, ob der letzte Punkt anomal ist. (Scala, Python)
- Anomalien suchen: Generiert ein Modell anhand einer ganzen Reihe und findet Anomalien in der Reihe. (Scala, Python)