Udostępnij za pośrednictwem


Wielowariancyjne wykrywanie anomalii za pomocą lasu izolacji

W tym artykule pokazano, jak można używać języka SynapseML na platformie Apache Spark do wykrywania anomalii wielowariancyjnych. Wielowariancyjne wykrywanie anomalii umożliwia wykrywanie anomalii między wieloma zmiennymi lub czasownikami, biorąc pod uwagę wszystkie korelacje i zależności między różnymi zmiennymi. W tym scenariuszu używamy usługi SynapseML do trenowania modelu lasu izolacji na potrzeby wykrywania anomalii wielowariancyjnych, a następnie używamy do wytrenowanego modelu w celu wnioskowania anomalii wielowariancyjnych w zestawie danych zawierającego syntetyczne pomiary z trzech czujników IoT.

Aby dowiedzieć się więcej na temat modelu Lasu izolacji, zapoznaj się z oryginalnym dokumentem Liu et al..

Wymagania wstępne

  • Dołącz notes do magazynu lakehouse. Po lewej stronie wybierz pozycję Dodaj , aby dodać istniejący obiekt lakehouse lub utworzyć jezioro.

Importowanie biblioteki

from IPython import get_ipython
from IPython.terminal.interactiveshell import TerminalInteractiveShell
import uuid
import mlflow

from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
from pyspark.ml import Pipeline

from synapse.ml.isolationforest import *

from synapse.ml.explainers import *
%matplotlib inline
from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

from synapse.ml.core.platform import *

if running_on_synapse():
    shell = TerminalInteractiveShell.instance()
    shell.define_macro("foo", """a,b=10,20""")

Dane wejściowe

# Table inputs
timestampColumn = "timestamp"  # str: the name of the timestamp column in the table
inputCols = [
    "sensor_1",
    "sensor_2",
    "sensor_3",
]  # list(str): the names of the input variables

# Training Start time, and number of days to use for training:
trainingStartTime = (
    "2022-02-24T06:00:00Z"  # datetime: datetime for when to start the training
)
trainingEndTime = (
    "2022-03-08T23:55:00Z"  # datetime: datetime for when to end the training
)
inferenceStartTime = (
    "2022-03-09T09:30:00Z"  # datetime: datetime for when to start the training
)
inferenceEndTime = (
    "2022-03-20T23:55:00Z"  # datetime: datetime for when to end the training
)

# Isolation Forest parameters
contamination = 0.021
num_estimators = 100
max_samples = 256
max_features = 1.0

Odczyt danych

df = (
    spark.read.format("csv")
    .option("header", "true")
    .load(
        "wasbs://publicwasb@mmlspark.blob.core.windows.net/generated_sample_mvad_data.csv"
    )
)

rzutowanie kolumn do odpowiednich typów danych

df = (
    df.orderBy(timestampColumn)
    .withColumn("timestamp", F.date_format(timestampColumn, "yyyy-MM-dd'T'HH:mm:ss'Z'"))
    .withColumn("sensor_1", F.col("sensor_1").cast(DoubleType()))
    .withColumn("sensor_2", F.col("sensor_2").cast(DoubleType()))
    .withColumn("sensor_3", F.col("sensor_3").cast(DoubleType()))
    .drop("_c5")
)

display(df)

Przygotowywanie danych treningowych

# filter to data with timestamps within the training window
df_train = df.filter(
    (F.col(timestampColumn) >= trainingStartTime)
    & (F.col(timestampColumn) <= trainingEndTime)
)
display(df_train)

Przygotowywanie danych testowych

# filter to data with timestamps within the inference window
df_test = df.filter(
    (F.col(timestampColumn) >= inferenceStartTime)
    & (F.col(timestampColumn) <= inferenceEndTime)
)
display(df_test)

Trenowanie modelu lasu izolacji

isolationForest = (
    IsolationForest()
    .setNumEstimators(num_estimators)
    .setBootstrap(False)
    .setMaxSamples(max_samples)
    .setMaxFeatures(max_features)
    .setFeaturesCol("features")
    .setPredictionCol("predictedLabel")
    .setScoreCol("outlierScore")
    .setContamination(contamination)
    .setContaminationError(0.01 * contamination)
    .setRandomSeed(1)
)

Następnie utworzymy potok uczenia maszynowego w celu wytrenowania modelu lasu izolacji. Pokazano również, jak utworzyć eksperyment MLflow i zarejestrować wytrenowany model.

Rejestracja modelu MLflow jest ściśle wymagana tylko w przypadku uzyskiwania dostępu do wytrenowanego modelu w późniejszym czasie. Do trenowania modelu i wnioskowania w tym samym notesie wystarczy model obiektów modelu.

va = VectorAssembler(inputCols=inputCols, outputCol="features")
pipeline = Pipeline(stages=[va, isolationForest])
model = pipeline.fit(df_train)

Wnioskowanie

Ładowanie wytrenowanego modelu lasu izolacji

Wnioskowanie

df_test_pred = model.transform(df_test)
display(df_test_pred)

Wstępnie wykonane Narzędzie do wykrywania anomalii

Narzędzie do wykrywania anomalii sztucznej inteligencji platformy Azure

  • Stan anomalii najnowszego punktu: generuje model przy użyciu poprzednich punktów i określa, czy najnowszy punkt jest nietypowy (Scala, Python)
  • Znajdowanie anomalii: generuje model przy użyciu całej serii i znajduje anomalie w serii (Scala, Python)