Delen via


Meervoudige anomaliedetectie met isolatieforest

In dit artikel wordt beschreven hoe u SynapseML in Apache Spark kunt gebruiken voor anomaliedetectie met meerdere variabelen. Met multivariate anomaliedetectie kunnen afwijkingen tussen veel variabelen of tijdreeksen worden gedetecteerd, rekening houdend met alle intercorrelaties en afhankelijkheden tussen de verschillende variabelen. In dit scenario gebruiken we SynapseML om een isolatieforestmodel te trainen voor anomaliedetectie met meerderevariaties. Vervolgens gebruiken we het getrainde model om afwijkingen met meerdere variabelen in een gegevensset af te leiden die synthetische metingen van drie IoT-sensoren bevatten.

Raadpleeg het originele document van Liu et al voor meer informatie over het isolatieforestmodel.

Vereisten

  • Koppel uw notitieblok aan een lakehouse. Selecteer aan de linkerkant Toevoegen om een bestaand lakehouse toe te voegen of een lakehouse te maken.

Bibliotheekimport

from IPython import get_ipython
from IPython.terminal.interactiveshell import TerminalInteractiveShell
import uuid
import mlflow

from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
from pyspark.ml import Pipeline

from synapse.ml.isolationforest import *

from synapse.ml.explainers import *
%matplotlib inline
from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

from synapse.ml.core.platform import *

if running_on_synapse():
    shell = TerminalInteractiveShell.instance()
    shell.define_macro("foo", """a,b=10,20""")

Invoergegevens

# Table inputs
timestampColumn = "timestamp"  # str: the name of the timestamp column in the table
inputCols = [
    "sensor_1",
    "sensor_2",
    "sensor_3",
]  # list(str): the names of the input variables

# Training Start time, and number of days to use for training:
trainingStartTime = (
    "2022-02-24T06:00:00Z"  # datetime: datetime for when to start the training
)
trainingEndTime = (
    "2022-03-08T23:55:00Z"  # datetime: datetime for when to end the training
)
inferenceStartTime = (
    "2022-03-09T09:30:00Z"  # datetime: datetime for when to start the training
)
inferenceEndTime = (
    "2022-03-20T23:55:00Z"  # datetime: datetime for when to end the training
)

# Isolation Forest parameters
contamination = 0.021
num_estimators = 100
max_samples = 256
max_features = 1.0

Gegevens lezen

df = (
    spark.read.format("csv")
    .option("header", "true")
    .load(
        "wasbs://publicwasb@mmlspark.blob.core.windows.net/generated_sample_mvad_data.csv"
    )
)

kolommen casten naar de juiste gegevenstypen

df = (
    df.orderBy(timestampColumn)
    .withColumn("timestamp", F.date_format(timestampColumn, "yyyy-MM-dd'T'HH:mm:ss'Z'"))
    .withColumn("sensor_1", F.col("sensor_1").cast(DoubleType()))
    .withColumn("sensor_2", F.col("sensor_2").cast(DoubleType()))
    .withColumn("sensor_3", F.col("sensor_3").cast(DoubleType()))
    .drop("_c5")
)

display(df)

Trainingsgegevensvoorbereiding

# filter to data with timestamps within the training window
df_train = df.filter(
    (F.col(timestampColumn) >= trainingStartTime)
    & (F.col(timestampColumn) <= trainingEndTime)
)
display(df_train)

Gegevensvoorbereiding testen

# filter to data with timestamps within the inference window
df_test = df.filter(
    (F.col(timestampColumn) >= inferenceStartTime)
    & (F.col(timestampColumn) <= inferenceEndTime)
)
display(df_test)

Isolatieforestmodel trainen

isolationForest = (
    IsolationForest()
    .setNumEstimators(num_estimators)
    .setBootstrap(False)
    .setMaxSamples(max_samples)
    .setMaxFeatures(max_features)
    .setFeaturesCol("features")
    .setPredictionCol("predictedLabel")
    .setScoreCol("outlierScore")
    .setContamination(contamination)
    .setContaminationError(0.01 * contamination)
    .setRandomSeed(1)
)

Vervolgens maken we een ML-pijplijn om het isolatieforestmodel te trainen. We laten ook zien hoe u een MLflow-experiment maakt en het getrainde model registreert.

MLflow-modelregistratie is strikt vereist als u het getrainde model op een later tijdstip opent. Voor het trainen van het model en het uitvoeren van deductie in hetzelfde notebook is het modelobjectmodel voldoende.

va = VectorAssembler(inputCols=inputCols, outputCol="features")
pipeline = Pipeline(stages=[va, isolationForest])
model = pipeline.fit(df_train)

Deductie uitvoeren

Het getrainde isolatieforestmodel laden

Deductie uitvoeren

df_test_pred = model.transform(df_test)
display(df_test_pred)

Vooraf gemaakte Anomaly Detector

Azure AI Anomaly Detector

  • Anomaliestatus van het laatste punt: genereert een model met behulp van voorgaande punten en bepaalt of het laatste punt afwijkend is (Scala, Python)
  • Afwijkingen zoeken: genereert een model met behulp van een hele reeks en zoekt afwijkingen in de reeks (Scala, Python)