격리 포리스트를 사용하여 다변량 변칙 검색
이 문서에서는 다변량 변칙 검색을 위해 Apache Spark에서 SynapseML을 사용하는 방법을 보여줍니다. 다변량 변칙 검색을 사용하면 다양한 변수 간의 모든 상호 상관 관계 및 종속성을 고려하여 여러 변수 또는 t시계열 사이에서 변칙을 검색할 수 있습니다. 이 시나리오에서는 SynapseML을 사용하여 다변량 변칙 검색을 위해 격리 포리스트 모델을 학습한 다음 학습된 모델을 사용하여 세 개의 IoT 센서에서 가상 측정을 포함하는 데이터 세트 내에서 다변량 변칙을 유추합니다.
격리 포리스트 모델에 대한 자세한 내용은 Liu 외의 원본 논문을 참조하세요.
필수 조건
- 레이크하우스에 Notebook을 첨부합니다. 왼쪽에서 추가를 선택하여 기존 레이크하우스를 추가하거나 레이크하우스를 만듭니다.
라이브러리 가져오기
from IPython import get_ipython
from IPython.terminal.interactiveshell import TerminalInteractiveShell
import uuid
import mlflow
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from synapse.ml.isolationforest import *
from synapse.ml.explainers import *
%matplotlib inline
from pyspark.sql import SparkSession
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
from synapse.ml.core.platform import *
if running_on_synapse():
shell = TerminalInteractiveShell.instance()
shell.define_macro("foo", """a,b=10,20""")
입력 데이터
# Table inputs
timestampColumn = "timestamp" # str: the name of the timestamp column in the table
inputCols = [
"sensor_1",
"sensor_2",
"sensor_3",
] # list(str): the names of the input variables
# Training Start time, and number of days to use for training:
trainingStartTime = (
"2022-02-24T06:00:00Z" # datetime: datetime for when to start the training
)
trainingEndTime = (
"2022-03-08T23:55:00Z" # datetime: datetime for when to end the training
)
inferenceStartTime = (
"2022-03-09T09:30:00Z" # datetime: datetime for when to start the training
)
inferenceEndTime = (
"2022-03-20T23:55:00Z" # datetime: datetime for when to end the training
)
# Isolation Forest parameters
contamination = 0.021
num_estimators = 100
max_samples = 256
max_features = 1.0
데이터 읽기
df = (
spark.read.format("csv")
.option("header", "true")
.load(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/generated_sample_mvad_data.csv"
)
)
열을 적절한 데이터 형식으로 캐스팅
df = (
df.orderBy(timestampColumn)
.withColumn("timestamp", F.date_format(timestampColumn, "yyyy-MM-dd'T'HH:mm:ss'Z'"))
.withColumn("sensor_1", F.col("sensor_1").cast(DoubleType()))
.withColumn("sensor_2", F.col("sensor_2").cast(DoubleType()))
.withColumn("sensor_3", F.col("sensor_3").cast(DoubleType()))
.drop("_c5")
)
display(df)
학습 데이터 준비
# filter to data with timestamps within the training window
df_train = df.filter(
(F.col(timestampColumn) >= trainingStartTime)
& (F.col(timestampColumn) <= trainingEndTime)
)
display(df_train)
테스트 데이터 준비
# filter to data with timestamps within the inference window
df_test = df.filter(
(F.col(timestampColumn) >= inferenceStartTime)
& (F.col(timestampColumn) <= inferenceEndTime)
)
display(df_test)
격리 포리스트 모델 학습
isolationForest = (
IsolationForest()
.setNumEstimators(num_estimators)
.setBootstrap(False)
.setMaxSamples(max_samples)
.setMaxFeatures(max_features)
.setFeaturesCol("features")
.setPredictionCol("predictedLabel")
.setScoreCol("outlierScore")
.setContamination(contamination)
.setContaminationError(0.01 * contamination)
.setRandomSeed(1)
)
다음으로 격리 포리스트 모델을 학습하는 ML 파이프라인을 만듭니다. 또한 MLflow 실험을 만들고 학습된 모델을 등록하는 방법을 보여 줍니다.
MLflow 모델 등록은 나중에 학습된 모델에 액세스하는 경우에만 엄격하게 필요합니다. 모델을 학습시키고 동일한 Notebook에서 추론을 수행하려면 모델 개체 모델로 충분합니다.
va = VectorAssembler(inputCols=inputCols, outputCol="features")
pipeline = Pipeline(stages=[va, isolationForest])
model = pipeline.fit(df_train)
추론 수행
학습된 격리 포리스트 모델 로드
추론 수행
df_test_pred = model.transform(df_test)
display(df_test_pred)
미리 만든 Anomaly Detector
- 최신 지점의 변칙 상태: 이전 지점을 사용하여 모델을 생성하고 최신 지점이 변칙인지 판단합니다. (Scala, Python)
- 변칙 찾기: 전체 시리즈를 사용하여 모델을 생성하고 시리즈에서 변칙을 찾습니다. (Scala, Python)