Condividi tramite


Rilevamento anomalie multivariato

Per informazioni generali sul rilevamento anomalie multivariato in Intelligence in tempo reale, vedere Rilevamento anomalie multivariato in Microsoft Fabric - panoramica. In questa esercitazione si usano dati di esempio per eseguire il training di un modello di rilevamento anomalie multivariato usando il motore Spark in un notebook Python. Si stimano quindi le anomalie applicando il modello sottoposto a training ai nuovi dati usando il motore Eventhouse. I primi passaggi consentono di configurare gli ambienti e i passaggi seguenti eseguono il training del modello e stimano le anomalie.

Prerequisiti

Parte 1- Abilitare la disponibilità di OneLake

La disponibilità di OneLake deve essere abilitata prima di ottenere i dati nell’Eventhouse. Questo passaggio è importante perché consente di rendere disponibili i dati inseriti in OneLake. In un passaggio successivo si accede a questi stessi dati dal notebook di Spark per eseguire il training del modello.

  1. Dal tuo spazio di lavoro, seleziona l'Eventhouse che hai creato nei prerequisiti. Scegliere il database in cui archiviare i dati.

  2. Nella sezione dettagli del Database selezionare l'Icona a forma di matita accanto a Disponibilità di OneLake

  3. Nel riquadro destro posizionare il pulsante su Attivo.

  4. Selezionare Fatto.

    Screenshot dell'abilitazione della disponibilità di OneLake in Eventhouse.

Parte 2- Abilitare il plug-in Python KQL

In questo passaggio si abilita il plug-in Python nella Eventhouse. Questo passaggio è necessario per eseguire il codice Python per stimare le anomalie nel set di query KQL. È importante scegliere il pacchetto corretto che contiene il pacchetto serie temporale-rilevamento anomalie.

  1. Nella schermata Eventhouse, seleziona il tuo database, quindi seleziona Gestisci>plug-in dalla barra multifunzione.

  2. Nel riquadro Plug-in impostare l'estensione del linguaggio Python suOn.

  3. Selezionare Python 3.11.7 DL (anteprima).

  4. Selezionare Fatto.

    Screenshot per come abilitare il pacchetto Python 3.11.7 DL nell’Eventhouse.

Parte 3- Creare un ambiente Spark

In questo passaggio si crea un ambiente Spark per eseguire il notebook Python che esegue il training del modello di rilevamento anomalie multivariato usando il motore Spark. Per ulteriori informazioni sulla creazione di ambienti, vedi Creare e gestire ambienti.

  1. Nella tua area di lavoro, selezionare + Nuovo elemento quindi Ambiente.

    Screenshot del riquadro Ambiente nella finestra Nuovo elemento.

  2. Immettere il nome MVAD_ENV per l'ambiente.

  3. In Cataloghi selezionare Cataloghi pubblici.

  4. Selezionare Aggiungi da PyPi.

  5. Nella casella di ricerca immettere serie temporale-rilevamento anomalie. La versione viene popolata automaticamente con la versione più recente. Questa esercitazione è stata creata usando la versione 0.2.7, che è la versione inclusa nella DL Kusto Python 3.11.7.

  6. Seleziona Salva.

    Screenshot dell'aggiunta del pacchetto PyPI all'ambiente Spark.

  7. Selezionare la scheda Home nell’ambiente.

  8. Seleziona l’icona Pubblica nella barra multifunzione.

  9. Selezionare Pubblica tutto. Questo passaggio può richiedere alcuni minuti.

    Screenshot della pubblicazione dell'ambiente.

Parte 4- Ottenere i dati nell’Eventhouse

  1. Passare il puntatore del mouse sul database KQL in cui archiviare i dati. Selezionare il menu Altro [...]>Ottieni dati>del file Locale.

    Screenshot di ottieni dati dal file locale.

  2. Selezionare + Nuova tabella e immettere demo_stocks_change come nome tabella.

  3. Nella finestra di dialogo Carica dati, selezionare Cerca file e caricare il file di dati campione scaricato nella sezione Prerequisiti

  4. Selezionare Avanti.

  5. Nella sezione Ispezione dati, impostare Prima riga è intestazione di colonna su On.

  6. Selezionare Fine.

  7. Al termine del caricamento selezionare Chiudi.

Parte 5- Copiare il percorso di OneLake nella tabella

Assicurarsi di selezionare la tabella demo_stocks_change. Nel riquadro Dettagli tabella selezionare Copia percorso per copiare il percorso di OneLake negli appunti. Salvare il testo copiato in un editor di testo da usare in un passaggio successivo.

Screenshot della copia del percorso di OneLake.

Parte 6- Preparare il notebook

  1. Selezionare l'area di lavoro.

  2. Selezionare Importa, Notebook e quindi Da questo computer.

  3. Selezionare Carica e scegliere il notebook scaricato nei prerequisiti.

  4. Dopo aver caricato il notebook, è possibile trovare e aprire il notebook dall'area di lavoro.

  5. Nella barra multifunzione superiore selezionare l'elenco a discesa predefinito dell’Area di lavoro e selezionare l'ambiente creato nel passaggio precedente.

    Screenshot della selezione dell'ambiente nel notebook.

7- Eseguire il notebook

  1. Importare pacchetti standard.

    import numpy as np
    import pandas as pd
    
  2. Spark richiede un URI ABFSS per connettersi in modo sicuro all'archiviazione OneLake, quindi il passaggio successivo definisce questa funzione per convertire l'URI di OneLake in URI ABFSS.

    def convert_onelake_to_abfss(onelake_uri):
    if not onelake_uri.startswith('https://'):
        raise ValueError("Invalid OneLake URI. It should start with 'https://'.")
    uri_without_scheme = onelake_uri[8:]
    parts = uri_without_scheme.split('/')
    if len(parts) < 3:
        raise ValueError("Invalid OneLake URI format.")
    account_name = parts[0].split('.')[0]
    container_name = parts[1]
    path = '/'.join(parts[2:])
    abfss_uri = f"abfss://{container_name}@{parts[0]}/{path}"
    return abfss_uri
    
  3. Immettere l'URI di OneLake copiato dal Parte 5- Copiare il percorso di OneLake nella tabella per caricare la tabella demo_stocks_change in un dataframe pandas.

    onelake_uri = "OneLakeTableURI" # Replace with your OneLake table URI 
    abfss_uri = convert_onelake_to_abfss(onelake_uri)
    print(abfss_uri)
    
    df = spark.read.format('delta').load(abfss_uri)
    df = df.toPandas().set_index('Date')
    print(df.shape)
    df[:3]
    
  4. Eseguire le celle seguenti per preparare i dataframe di training e previsione.

    Nota

    Le stime effettive verranno eseguite sui dati dalla Eventhouse nella parte 9- Previsione-anomalie-nel-set-di-query-kql. In uno scenario di produzione, se si esegue lo streaming dei dati nell’eventhouse, le stime verrebbero effettuate sui nuovi dati di streaming. Ai fini dell'esercitazione, il set di dati è stato suddiviso per data in due sezioni per il training e la previsione. Si tratta di simulare dati storici e nuovi dati di streaming.

    features_cols = ['AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY']
    cutoff_date = pd.to_datetime('2023-01-01')
    
    train_df = df[df.Date < cutoff_date]
    print(train_df.shape)
    train_df[:3]
    
    train_len = len(train_df)
    predict_len = len(df) - train_len
    print(f'Total samples: {len(df)}. Split to {train_len} for training, {predict_len} for testing')
    
  5. Eseguire le celle per eseguire il training del modello e salvarlo nel registro dei modelli MLflow di Fabric.

    import mlflow
    from anomaly_detector import MultivariateAnomalyDetector
    model = MultivariateAnomalyDetector()
    
    sliding_window = 200
    param   s = {"sliding_window": sliding_window}
    
    model.fit(train_df, params=params)
    
    with mlflow.start_run():
        mlflow.log_params(params)
        mlflow.set_tag("Training Info", "MVAD on 5 Stocks Dataset")
    
        model_info = mlflow.pyfunc.log_model(
            python_model=model,
            artifact_path="mvad_artifacts",
            registered_model_name="mvad_5_stocks_model",
        )
    
    # Extract the registered model path to be used for prediction using Kusto Python sandbox
    
    mi = mlflow.search_registered_models(filter_string="name='mvad_5_stocks_model'")[0]
    model_abfss = mi.latest_versions[0].source
    print(model_abfss)
    
  6. Copiare l'URI del modello dall'ultimo output della cella da usare in un passaggio successivo.

Parte 8- Configurare il set di query KQL

Per informazioni generali, vedere Creare un set di query KQL.

  1. Dalla tua area di lavoro, seleziona +Nuovo elemento>set di query KQL.
  2. Immettere il nome MultivariateAnomalyDetectionTutoriale quindi selezionare Crea.
  3. Nella finestra dell'hub dati OneLake selezionare il database KQL in cui sono stati archiviati i dati.
  4. Selezionare Connetti.

Parte 9- Stimare le anomalie nel set di query KQL

  1. Copiare/incollare ed eseguire la seguente query '.crea-o-modifica funzione' per definire la predict_fabric_mvad_fl() funzione archiviata:

    .create-or-alter function with (folder = "Packages\\ML", docstring = "Predict MVAD model in Microsoft Fabric")
    predict_fabric_mvad_fl(samples:(*), features_cols:dynamic, artifacts_uri:string, trim_result:bool=false)
    {
        let s = artifacts_uri;
        let artifacts = bag_pack('MLmodel', strcat(s, '/MLmodel;impersonate'), 'conda.yaml', strcat(s, '/conda.yaml;impersonate'),
                                 'requirements.txt', strcat(s, '/requirements.txt;impersonate'), 'python_env.yaml', strcat(s, '/python_env.yaml;impersonate'),
                                 'python_model.pkl', strcat(s, '/python_model.pkl;impersonate'));
        let kwargs = bag_pack('features_cols', features_cols, 'trim_result', trim_result);
        let code = ```if 1:
            import os
            import shutil
            import mlflow
            model_dir = 'C:/Temp/mvad_model'
            model_data_dir = model_dir + '/data'
            os.mkdir(model_dir)
            shutil.move('C:/Temp/MLmodel', model_dir)
            shutil.move('C:/Temp/conda.yaml', model_dir)
            shutil.move('C:/Temp/requirements.txt', model_dir)
            shutil.move('C:/Temp/python_env.yaml', model_dir)
            shutil.move('C:/Temp/python_model.pkl', model_dir)
            features_cols = kargs["features_cols"]
            trim_result = kargs["trim_result"]
            test_data = df[features_cols]
            model = mlflow.pyfunc.load_model(model_dir)
            predictions = model.predict(test_data)
            predict_result = pd.DataFrame(predictions)
            samples_offset = len(df) - len(predict_result)        # this model doesn't output predictions for the first sliding_window-1 samples
            if trim_result:                                       # trim the prefix samples
                result = df[samples_offset:]
                result.iloc[:,-4:] = predict_result.iloc[:, 1:]   # no need to copy 1st column which is the timestamp index
            else:
                result = df                                       # output all samples
                result.iloc[samples_offset:,-4:] = predict_result.iloc[:, 1:]
            ```;
        samples
        | evaluate python(typeof(*), code, kwargs, external_artifacts=artifacts)
    }
    
  2. Copiare/incollare la seguente query di previsione.

    1. Sostituire l'URI del modello di output copiato alla fine del passaggio 7.
    2. Esegui la query. Rileva anomalie multivariate sui cinque titoli, in base al modello sottoposto a training ed esegue il rendering dei risultati come anomalychart. I punti anomali vengono sottoposti a rendering sul primo titolo (AAPL), anche se rappresentano anomalie multivariate (in altre parole, anomalie delle modifiche comuni delle cinque azioni nella data specifica).
    let cutoff_date=datetime(2023-01-01);
    let num_predictions=toscalar(demo_stocks_change | where Date >= cutoff_date | count);   //  number of latest points to predict
    let sliding_window=200;                                                                 //  should match the window that was set for model training
    let prefix_score_len = sliding_window/2+min_of(sliding_window/2, 200)-1;
    let num_samples = prefix_score_len + num_predictions;
    demo_stocks_change
    | top num_samples by Date desc 
    | order by Date asc
    | extend is_anomaly=bool(false), score=real(null), severity=real(null), interpretation=dynamic(null)
    | invoke predict_fabric_mvad_fl(pack_array('AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY'),
                // NOTE: Update artifacts_uri to model path
                artifacts_uri='enter your model URI here',
                trim_result=true)
    | summarize Date=make_list(Date), AAPL=make_list(AAPL), AMZN=make_list(AMZN), GOOG=make_list(GOOG), MSFT=make_list(MSFT), SPY=make_list(SPY), anomaly=make_list(toint(is_anomaly))
    | render anomalychart with(anomalycolumns=anomaly, title='Stock Price Changest in % with Anomalies')
    

Il grafico delle anomalie risultante dovrebbe essere simile all'immagine seguente:

Screenshot dell'output delle anomalie multivariate.

Pulire le risorse

Al termine del tutorial, è possibile eliminare le risorse create per evitare di incorrere in altri costi. Per rimuovere le risorse, seguire questa procedura:

  1. Passare alla home page dell'area di lavoro.
  2. Eliminare l’ambiente creato in questa esercitazione.
  3. Eliminare il notebook creato in questo tutorial.
  4. Eliminare l'eventhouse o il database usato in questo tutorial.
  5. Eliminare il KQL creato in questa esercitazione.