Compartir vía


Detección de anomalías multivariante

Para obtener información general sobre la detección de anomalías multivariantes en inteligencia en tiempo real, consulta Detección de anomalías multivariantes en Microsoft Fabric: información general. En este tutorial, usarás datos de muestra para entrenar un modelo de detección de anomalías multivariante usando el motor Spark en un cuaderno Python. Después, predecirá anomalías aplicando el modelo entrenado a nuevos datos mediante el motor de Eventhouse. Los primeros pasos configuran los entornos y los pasos siguientes entrenan el modelo y predicen anomalías.

Requisitos previos

Parte 1: Habilitación de la disponibilidad de OneLake

La disponibilidad de OneLake debe estar habilitada antes de obtener datos en Eventhouse. Este paso es importante, ya que permite que los datos que ingieres estén disponibles en OneLake. En un paso posterior, tendrás acceso a estos mismos datos desde Spark Notebook para entrenar el modelo.

  1. Vaya a la página de inicio de su área de trabajo en Real-Time Intelligence.

  2. Seleccione el Eventhouse que creó en la sección anterior. Elige la base de datos en la que quieres almacenar los datos.

  3. En el icono Detalles de la base de datos, selecciona el icono de lápiz al lado de disponibilidad de OneLake

  4. En el panel derecho, cambia el botón a Activado.

  5. Seleccione Listo.

    Captura de pantalla de la habilitación de la disponibilidad de OneLake en Eventhouse.

Parte 2: Habilitación de la extensión de Python KQL

En este paso, habilitarás el complemento de Python en Eventhouse. Este paso es necesario para ejecutar el código de Python de predicción de anomalías en el conjunto de consultas KQL. Es importante elegir el paquete correcto que contiene el paquete time-series-anomaly-detector.

  1. En la pantalla Eventhouse, selecciona la base de datos y, a continuación, Administrar>Extensiones en el desplegable.

  2. En el panel Extensiones, cambia la Extensión del lenguaje Python a Activado.

  3. Selecciona Python 3.11.7 DL (versión preliminar).

  4. Seleccione Listo.

    Captura de pantalla de cómo habilitar el paquete de Python 3.11.7 DL en Eventhouse.

Parte 3: Creación de un entorno de Spark

En este paso, crearás un entorno de Spark para ejecutar el cuaderno de Python que entrena el modelo de detección de anomalías multivariante mediante el motor de Spark. Para obtener más información sobre la creación de entornos, consulta Crear y administrar entornos.

  1. En el conmutador de experiencia, elija Ingeniería de datos. Si ya estás en la experiencia de Ingeniero de datos, navega hasta Inicio.

  2. En Elementos recomendados para crear, seleccione Entornos e introduzca el nombre MVAD_ENV para el entorno.

    Captura de pantalla de la creación de un entorno en Ingeniería de datos.

  3. En Bibliotecas, selecciona Bibliotecas públicas.

  4. Selecciona Agregar desde PyPI.

  5. En el cuadro de búsqueda, escribe time-series-anomaly-detector. La versión se completa automáticamente con la versión más reciente. Este tutorial se creó con la versión 0.2.7, que es la versión incluida en Kusto Python 3.11.7 DL.

  6. Seleccione Guardar.

    Captura de pantalla de la adición del paquete PyPI al entorno de Spark.

  7. Selecciona la pestaña Inicios en el entorno.

  8. Selecciona el icono Publicar del desplegable.

  9. Seleccione Publicar todo. Este paso puede tardar varios minutos en completarse.

    Captura de pantalla de la publicación del entorno.

Parte 4: Obtención de datos en Eventhouse

  1. Mueve el puntero sobre la base de datos de KQL en la que quieres almacenar los datos. Selecciona el menú Más [...]>Obtener datos>Archivo local.

    Captura de pantalla de obtención de datos del archivo local.

  2. Selecciona + Nueva tabla y escribe demo_stocks_change como nombre de tabla.

  3. En el cuadro de diálogo cargar datos, selecciona Buscar archivos y carga el archivo de datos de ejemplo que descargaste en los Requisitos previos

  4. Seleccione Siguiente.

  5. En la sección Inspeccionar los datos, cambia el botón de La primera fila es el encabezado de columna a Activado.

  6. Seleccione Finalizar.

  7. Una vez que los datos estén subidos, selecciona Cerrar.

Parte 5: Copia de la ruta de acceso OneLake a la tabla

Asegúrate de seleccionar la tabla demo_stocks_change. En el icono Detalles de la tabla, selecciona Copiar ruta de acceso para copiar la ruta de acceso de OneLake en el Portapapeles. Guarda este texto copiado en un editor de texto en algún lugar para usarlo en un paso posterior.

Captura de pantalla de la copia de la ruta de acceso de OneLake.

Parte 6: Preparación del cuaderno

  1. En el conmutador de experiencia, elija Desarrollar y seleccione el área de trabajo.

  2. Seleccione Importar, cuaderno y luego Desde este equipo.

  3. Selecciona Cargar y elige el cuaderno que descargaste en Requisitos previos.

  4. Una vez cargado el cuaderno, puede encontrar y abrir el cuaderno desde el área de trabajo.

  5. En el desplegable superior, selecciona el menú desplegable Área de trabajo predeterminada y selecciona el entorno que creaste en el paso anterior.

    Captura de pantalla de la selección del entorno en el cuaderno.

Parte 7: Ejecución del cuaderno

  1. Importar paquetes estándar.

    import numpy as np
    import pandas as pd
    
  2. Spark necesita un identificador URI de ABFSS para conectarse de forma segura a OneLake Storage, por lo que el siguiente paso define esta función para convertir el URI de OneLake en URI de ABFSS.

    def convert_onelake_to_abfss(onelake_uri):
    if not onelake_uri.startswith('https://'):
        raise ValueError("Invalid OneLake URI. It should start with 'https://'.")
    uri_without_scheme = onelake_uri[8:]
    parts = uri_without_scheme.split('/')
    if len(parts) < 3:
        raise ValueError("Invalid OneLake URI format.")
    account_name = parts[0].split('.')[0]
    container_name = parts[1]
    path = '/'.join(parts[2:])
    abfss_uri = f"abfss://{container_name}@{parts[0]}/{path}"
    return abfss_uri
    
  3. Escribe el identificador URI de OneLake copiado de la Parte 5: Copia de la ruta de acceso OneLake a la tabla para cargar la tabla demo_stocks_change en un dataframe pandas.

    onelake_uri = "OneLakeTableURI" # Replace with your OneLake table URI 
    abfss_uri = convert_onelake_to_abfss(onelake_uri)
    print(abfss_uri)
    
    df = spark.read.format('delta').load(abfss_uri)
    df = df.toPandas().set_index('Date')
    print(df.shape)
    df[:3]
    
  4. Ejecuta las celdas siguientes para preparar los dataframes de entrenamiento y predicción.

    Nota:

    Eventhouse ejecutará las predicciones reales en los datos en la Parte 9: Predicción de anomalías en el conjunto de consultas KQL. En un escenario de producción, si transmitieras datos al centro de eventos, las predicciones se realizarían en los nuevos datos de streaming. Para el tutorial, el conjunto de datos se ha dividido por fecha en dos secciones para el entrenamiento y la predicción. Esto es para simular datos históricos y nuevos datos de streaming.

    features_cols = ['AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY']
    cutoff_date = pd.to_datetime('2023-01-01')
    
    train_df = df[df.Date < cutoff_date]
    print(train_df.shape)
    train_df[:3]
    
    train_len = len(train_df)
    predict_len = len(df) - train_len
    print(f'Total samples: {len(df)}. Split to {train_len} for training, {predict_len} for testing')
    
  5. Ejecuta las celdas para entrenar el modelo y guárdala en el registro de modelos de MLflow de Fabric.

    import mlflow
    from anomaly_detector import MultivariateAnomalyDetector
    model = MultivariateAnomalyDetector()
    
    sliding_window = 200
    param   s = {"sliding_window": sliding_window}
    
    model.fit(train_df, params=params)
    
    with mlflow.start_run():
        mlflow.log_params(params)
        mlflow.set_tag("Training Info", "MVAD on 5 Stocks Dataset")
    
        model_info = mlflow.pyfunc.log_model(
            python_model=model,
            artifact_path="mvad_artifacts",
            registered_model_name="mvad_5_stocks_model",
        )
    
    # Extract the registered model path to be used for prediction using Kusto Python sandbox
    
    mi = mlflow.search_registered_models(filter_string="name='mvad_5_stocks_model'")[0]
    model_abfss = mi.latest_versions[0].source
    print(model_abfss)
    
  6. Copia el identificador URI del modelo de la última salida de celda. Lo usarás en un paso posterior.

Parte 8: Configuración del conjunto de consultas KQL

Para obtener información general, consulta Creación de un conjunto de consultas KQL.

  1. En el conmutador de experiencia, elige Inteligencia en tiempo real.
  2. Seleccione su área de trabajo.
  3. Selecciona +Nuevo elemento>Conjunto de consultas KQL. Escribe el nombre MultivariateAnomalyDetectionTutorial.
  4. Seleccione Crear.
  5. En la ventana del centro de datos de OneLake, selecciona la base de datos KQL donde almacenaste los datos.
  6. Seleccione Conectar.

Parte 9: Predicción de anomalías en el conjunto de consultas KQL

  1. Copia o pega y ejecuta la siguiente consulta “.create-or-alter function” para definir la función almacenada predict_fabric_mvad_fl():

    .create-or-alter function with (folder = "Packages\\ML", docstring = "Predict MVAD model in Microsoft Fabric")
    predict_fabric_mvad_fl(samples:(*), features_cols:dynamic, artifacts_uri:string, trim_result:bool=false)
    {
        let s = artifacts_uri;
        let artifacts = bag_pack('MLmodel', strcat(s, '/MLmodel;impersonate'), 'conda.yaml', strcat(s, '/conda.yaml;impersonate'),
                                 'requirements.txt', strcat(s, '/requirements.txt;impersonate'), 'python_env.yaml', strcat(s, '/python_env.yaml;impersonate'),
                                 'python_model.pkl', strcat(s, '/python_model.pkl;impersonate'));
        let kwargs = bag_pack('features_cols', features_cols, 'trim_result', trim_result);
        let code = ```if 1:
            import os
            import shutil
            import mlflow
            model_dir = 'C:/Temp/mvad_model'
            model_data_dir = model_dir + '/data'
            os.mkdir(model_dir)
            shutil.move('C:/Temp/MLmodel', model_dir)
            shutil.move('C:/Temp/conda.yaml', model_dir)
            shutil.move('C:/Temp/requirements.txt', model_dir)
            shutil.move('C:/Temp/python_env.yaml', model_dir)
            shutil.move('C:/Temp/python_model.pkl', model_dir)
            features_cols = kargs["features_cols"]
            trim_result = kargs["trim_result"]
            test_data = df[features_cols]
            model = mlflow.pyfunc.load_model(model_dir)
            predictions = model.predict(test_data)
            predict_result = pd.DataFrame(predictions)
            samples_offset = len(df) - len(predict_result)        # this model doesn't output predictions for the first sliding_window-1 samples
            if trim_result:                                       # trim the prefix samples
                result = df[samples_offset:]
                result.iloc[:,-4:] = predict_result.iloc[:, 1:]   # no need to copy 1st column which is the timestamp index
            else:
                result = df                                       # output all samples
                result.iloc[samples_offset:,-4:] = predict_result.iloc[:, 1:]
            ```;
        samples
        | evaluate python(typeof(*), code, kwargs, external_artifacts=artifacts)
    }
    
  2. Copia o pega la siguiente consulta de predicción.

    1. Reemplaza el identificador URI del modelo de salida copiado al final del Paso 7.
    2. Ejecute la consulta. Detecta anomalías multivariante en las cinco acciones, en función del modelo entrenado y representa los resultados como anomalychart. Los puntos anómalos se representan en la primera acción (AAPL), aunque representan anomalías multivariantes (es decir, anomalías de los cambios conjuntos de las cinco existencias en la fecha específica).
    let cutoff_date=datetime(2023-01-01);
    let num_predictions=toscalar(demo_stocks_change | where Date >= cutoff_date | count);   //  number of latest points to predict
    let sliding_window=200;                                                                 //  should match the window that was set for model training
    let prefix_score_len = sliding_window/2+min_of(sliding_window/2, 200)-1;
    let num_samples = prefix_score_len + num_predictions;
    demo_stocks_change
    | top num_samples by Date desc 
    | order by Date asc
    | extend is_anomaly=bool(false), score=real(null), severity=real(null), interpretation=dynamic(null)
    | invoke predict_fabric_mvad_fl(pack_array('AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY'),
                // NOTE: Update artifacts_uri to model path
                artifacts_uri='enter your model URI here',
                trim_result=true)
    | summarize Date=make_list(Date), AAPL=make_list(AAPL), AMZN=make_list(AMZN), GOOG=make_list(GOOG), MSFT=make_list(MSFT), SPY=make_list(SPY), anomaly=make_list(toint(is_anomaly))
    | render anomalychart with(anomalycolumns=anomaly, title='Stock Price Changest in % with Anomalies')
    

El gráfico de anomalías resultante debe tener un aspecto similar a la siguiente imagen:

Captura de pantalla de la salida de anomalías multivariantes.

Limpieza de recursos

Cuando termine el tutorial, puede eliminar los recursos que creó para evitar incurrir en otros costos. Para eliminar los recursos, siga estos pasos:

  1. Vaya a la página principal del área de trabajo.
  2. Elimina el entorno que hayas creado en este tutorial.
  3. Elimine el cuaderno que haya creado en este tutorial.
  4. Elimine el Eventhouse o la base de datos usados en este tutorial.
  5. Elimina el conjunto de consultas KQL que hayas creado en este tutorial.