การตรวจจับความผิดปกติแบบหลากหลายแบบด้วยฟอเรสต์แยก
บทความนี้แสดงให้เห็นว่าคุณสามารถใช้ SynapseML บน Apache Spark สําหรับการตรวจหาสิ่งผิดปกติที่หลากหลายได้อย่างไร การตรวจหาสิ่งผิดปกติแบบหลากหลายช่วยให้สามารถตรวจหาสิ่งผิดปกติระหว่างตัวแปรหรือเวลาต่าง ๆ จํานวนมากโดยคํานึงถึงความสัมพันธ์และการขึ้นต่อกันทั้งหมดระหว่างตัวแปรที่แตกต่างกัน ในสถานการณ์นี้ เราใช้ SynapseML เพื่อฝึกแบบจําลอง Isolation Forest สําหรับการตรวจหาสิ่งผิดปกติที่หลากหลาย จากนั้นเราใช้แบบจําลองที่ได้รับการฝึกเพื่ออนุมานความผิดปกติที่หลากหลายภายในชุดข้อมูลที่ประกอบด้วยการวัดผลแบบสังเคราะห์จากเซ็นเซอร์ IoT สามตัว
หากต้องการเรียนรู้เพิ่มเติมเกี่ยวกับแบบจําลอง Isolation Forest ดูที่กระดาษต้นฉบับโดย Liu et al
ข้อกำหนดเบื้องต้น
- แนบสมุดบันทึกของคุณเข้ากับเลคเฮ้าส์ ทางด้านซ้าย เลือก เพิ่ม เพื่อเพิ่มเลคเฮาส์ที่มีอยู่หรือสร้างเลคเฮ้าส์
การนําเข้าไลบรารี
from IPython import get_ipython
from IPython.terminal.interactiveshell import TerminalInteractiveShell
import uuid
import mlflow
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from synapse.ml.isolationforest import *
from synapse.ml.explainers import *
%matplotlib inline
from pyspark.sql import SparkSession
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
from synapse.ml.core.platform import *
if running_on_synapse():
shell = TerminalInteractiveShell.instance()
shell.define_macro("foo", """a,b=10,20""")
ข้อมูลอินพุต
# Table inputs
timestampColumn = "timestamp" # str: the name of the timestamp column in the table
inputCols = [
"sensor_1",
"sensor_2",
"sensor_3",
] # list(str): the names of the input variables
# Training Start time, and number of days to use for training:
trainingStartTime = (
"2022-02-24T06:00:00Z" # datetime: datetime for when to start the training
)
trainingEndTime = (
"2022-03-08T23:55:00Z" # datetime: datetime for when to end the training
)
inferenceStartTime = (
"2022-03-09T09:30:00Z" # datetime: datetime for when to start the training
)
inferenceEndTime = (
"2022-03-20T23:55:00Z" # datetime: datetime for when to end the training
)
# Isolation Forest parameters
contamination = 0.021
num_estimators = 100
max_samples = 256
max_features = 1.0
อ่านข้อมูล
df = (
spark.read.format("csv")
.option("header", "true")
.load(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/generated_sample_mvad_data.csv"
)
)
แปลงคอลัมน์ให้เป็นชนิดข้อมูลที่เหมาะสม
df = (
df.orderBy(timestampColumn)
.withColumn("timestamp", F.date_format(timestampColumn, "yyyy-MM-dd'T'HH:mm:ss'Z'"))
.withColumn("sensor_1", F.col("sensor_1").cast(DoubleType()))
.withColumn("sensor_2", F.col("sensor_2").cast(DoubleType()))
.withColumn("sensor_3", F.col("sensor_3").cast(DoubleType()))
.drop("_c5")
)
display(df)
การเตรียมข้อมูลการฝึก
# filter to data with timestamps within the training window
df_train = df.filter(
(F.col(timestampColumn) >= trainingStartTime)
& (F.col(timestampColumn) <= trainingEndTime)
)
display(df_train)
ทดสอบการเตรียมข้อมูล
# filter to data with timestamps within the inference window
df_test = df.filter(
(F.col(timestampColumn) >= inferenceStartTime)
& (F.col(timestampColumn) <= inferenceEndTime)
)
display(df_test)
แบบจําลองป่าแยกรถไฟ
isolationForest = (
IsolationForest()
.setNumEstimators(num_estimators)
.setBootstrap(False)
.setMaxSamples(max_samples)
.setMaxFeatures(max_features)
.setFeaturesCol("features")
.setPredictionCol("predictedLabel")
.setScoreCol("outlierScore")
.setContamination(contamination)
.setContaminationError(0.01 * contamination)
.setRandomSeed(1)
)
ถัดไปเราสร้างไปป์ไลน์ ML เพื่อฝึกแบบจําลอง Isolation Forest นอกจากนี้เรายังสาธิตวิธีการสร้างการทดลอง MLflow และลงทะเบียนแบบจําลองที่ได้รับการฝึกแล้ว
จะต้องลงทะเบียนแบบจําลอง MLflow อย่างเคร่งครัดเฉพาะเมื่อเข้าถึงแบบจําลองที่ได้รับการฝึกไว้ในภายหลัง สําหรับการฝึกแบบจําลองและการดําเนินการอนุมานในสมุดบันทึกเดียวกัน แบบจําลองวัตถุแบบจําลองก็เพียงพอ
va = VectorAssembler(inputCols=inputCols, outputCol="features")
pipeline = Pipeline(stages=[va, isolationForest])
model = pipeline.fit(df_train)
ดําเนินการอนุมาน
โหลดแบบจําลองป่าการแยกที่ได้รับการฝึก
ดําเนินการอนุมาน
df_test_pred = model.transform(df_test)
display(df_test_pred)
เครื่องตรวจจับสิ่งผิดปกติแบบ Premade
เครื่องตรวจจับความผิดปกติของ Azure AI
- สถานะความผิดปกติของจุดล่าสุด: สร้างแบบจําลองโดยใช้จุดก่อนหน้าและพิจารณาว่าจุดล่าสุดผิดปกติ (สเกลา Python) หรือไม่
- ค้นหาสิ่งผิดปกติ: สร้างแบบจําลองโดยใช้ชุดข้อมูลทั้งหมดและค้นหาสิ่งผิดปกติในชุดข้อมูล (Scala, Python)