Udostępnij za pośrednictwem


Wielowymiarowe wykrywanie anomalii

Aby uzyskać ogólne informacje na temat wielowariancyjnego wykrywania anomalii w analizie w czasie rzeczywistym, zobacz Multivariate anomaly detection in Microsoft Fabric - overview (Wykrywanie anomalii wielowariancyjnych w usłudze Microsoft Fabric — omówienie). W tym samouczku użyjesz danych przykładowych do wytrenowania modelu wykrywania anomalii w danych wielowymiarowych przy użyciu silnika Spark w notesie języka Python. Następnie przewidujesz anomalie, stosując wytrenowany model do nowych danych przy użyciu silnika usługi Eventhouse. Pierwsze kilka kroków konfigurowania środowisk oraz następujące kroki przeszkolić model i przewidzieć anomalie.

Wymagania wstępne

Część 1. Włączanie dostępności usługi OneLake

Przed pobraniem danych w magazynie zdarzeń należy włączyć dostępność usługi OneLake. Ten krok jest ważny, ponieważ umożliwia pozyskiwanie danych w usłudze OneLake. W późniejszym kroku uzyskujesz dostęp do tych samych danych z notesu platformy Spark, aby wytrenować model.

  1. W obszarze roboczym wybierz Eventhouse, które utworzyłeś podczas wymagań wstępnych. Wybierz bazę danych, w której chcesz przechowywać dane.

  2. W panelu szczegółów bazy danych przełącz przycisk dostępności OneLake na Włącz.

    Zrzut ekranu przedstawiający włączanie dostępności usługi OneLake w usłudze Eventhouse.

Część 2. Włączanie wtyczki języka Python języka KQL

W tym kroku włączysz wtyczkę języka Python w usłudze Eventhouse. Ten krok jest wymagany do uruchomienia kodu języka Python przewidywania anomalii w zestawie zapytań KQL. Ważne jest, aby wybrać prawidłowy obraz, który zawiera pakiet detektora anomalii szeregów czasowych.

  1. Na ekranie Eventhouse wybierz pozycję Eventhouse>Plugins na wstążce.

  2. W okienku Wtyczki przełącz rozszerzenie języka Python na.

  3. Wybierz pozycję Python 3.11.7 DL (wersja zapoznawcza).

  4. Wybierz pozycję Gotowe.

    Zrzut ekranu przedstawiający sposób włączania pakietu python 3.11.7 DL w usłudze Eventhouse.

Część 3. Tworzenie środowiska Spark

W tym kroku utworzysz środowisko Platformy Spark, aby uruchomić notes języka Python, który trenuje wielowariantny model wykrywania anomalii przy użyciu aparatu Spark. Aby uzyskać więcej informacji na temat tworzenia środowisk, zobacz Tworzenie środowisk i zarządzanie nimi.

  1. W obszarze roboczym wybierz pozycję + Nowy element, a następnie Środowisko.

    Zrzut ekranu przedstawiający kafelek Środowisko w oknie Nowy element.

  2. Wprowadź nazwę MVAD_ENV dla środowiska, a następnie wybierz Utwórz.

  3. Na karcie Narzędzia główne środowiska wybierz Runtime>1.2 (Spark 3.4, Delta 2.4).

  4. W obszarze Biblioteki wybierz pozycję Biblioteki publiczne.

  5. Wybierz pozycję Dodaj z PyPI.

  6. W polu wyszukiwania wprowadź time-series-anomaly-detector. Wersja zostanie automatycznie wypełniona najnowszą wersją. Ten samouczek został utworzony przy użyciu wersji 0.3.2.

  7. Wybierz pozycję Zapisz.

    Zrzut ekranu przedstawiający dodawanie pakietu PyPI do środowiska Spark.

  8. Wybierz kartę Narzędzia główne w środowisku.

  9. Wybierz ikonę Publikuj na wstążce.

  10. Wybierz opcję Publikuj wszystko. Wykonanie tego kroku może potrwać kilka minut.

    Zrzut ekranu przedstawiający publikowanie środowiska.

Część 4. Pobieranie danych do usługi Eventhouse

  1. Umieść kursor na bazie danych KQL, w której chcesz przechowywać dane. Wybierz menu Więcej [...]>Pobierz plik lokalny danych>.

    Zrzut ekranu przedstawiający pobieranie danych z pliku lokalnego.

  2. Wybierz pozycję + Nowa tabela i wprowadź demo_stocks_change jako nazwę tabeli.

  3. W oknie dialogowym przekazywanie danych wybierz pozycję Przeglądaj dla plików i przekaż przykładowy plik danych pobrany w wymaganiach wstępnych

  4. Wybierz Dalej.

  5. W sekcji Inspekcja danych przełącz pozycję Pierwszy wiersz to nagłówek kolumny w pozycji Włączone.

  6. Wybierz Zakończ.

  7. Po przekazaniu danych wybierz pozycję Zamknij.

Część 5. Kopiowanie ścieżki OneLake do tabeli

Upewnij się, że wybrano tabelę demo_stocks_change . W okienku szczegółów tabeli wybierz folder OneLake , aby skopiować ścieżkę OneLake do schowka. Zapisz ten skopiowany tekst w edytorze tekstów, który ma być używany w późniejszym kroku.

Zrzut ekranu przedstawiający kopiowanie ścieżki OneLake.

Część 6. Przygotowywanie notesu

  1. Wybierz obszar roboczy.

  2. Wybierz pozycję Importuj, Notes, a następnie Z tego komputera.

  3. Wybierz pozycję Przekaż i wybierz notes pobrany w wymaganiach wstępnych.

  4. Po przekazaniu notesu możesz znaleźć i otworzyć notes z obszaru roboczego.

  5. Na górnej wstążce wybierz listę rozwijaną Obszar roboczy i wybierz środowisko utworzone w poprzednim kroku.

    Zrzut ekranu przedstawiający wybieranie środowiska w notesie.

Część 7. Uruchamianie notesu

  1. Importuj standardowe pakiety.

    import numpy as np
    import pandas as pd
    
  2. Platforma Spark wymaga identyfikatora URI ABFSS, aby bezpiecznie nawiązać połączenie z magazynem OneLake, więc następny krok definiuje tę funkcję w celu przekonwertowania identyfikatora URI usługi OneLake na identyfikator URI ABFSS.

    def convert_onelake_to_abfss(onelake_uri):
        if not onelake_uri.startswith('https://'):
            raise ValueError("Invalid OneLake URI. It should start with 'https://'.")
        uri_without_scheme = onelake_uri[8:]
        parts = uri_without_scheme.split('/')
        if len(parts) < 3:
            raise ValueError("Invalid OneLake URI format.")
        account_name = parts[0].split('.')[0]
        container_name = parts[1]
        path = '/'.join(parts[2:])
        abfss_uri = f"abfss://{container_name}@{parts[0]}/{path}"
        return abfss_uri
    
  3. Zastąp symbol zastępczy OneLakeTableURI identyfikatorem URI usługi OneLake skopiowanym z Part 5 — Skopiuj ścieżkę OneLake do tabeli, aby załadować tabelę demo_stocks_change do ramki danych biblioteki pandas.

    onelake_uri = "OneLakeTableURI" # Replace with your OneLake table URI
    abfss_uri = convert_onelake_to_abfss(onelake_uri)
    print(abfss_uri)
    
    df = spark.read.format('delta').load(abfss_uri)
    df = df.toPandas().set_index('Date')
    print(df.shape)
    df[:3]
    
  4. Uruchom następujące komórki, aby przygotować ramki danych trenowania i przewidywania.

    Uwaga

    Rzeczywiste przewidywania będą uruchamiane na danych przez usługę Eventhouse w części 9— Predict-anomalies-in-the-kql-queryset. W scenariuszu produkcyjnym w przypadku przesyłania strumieniowego danych do magazynu zdarzeń przewidywania będą wykonywane na nowych danych przesyłanych strumieniowo. Na potrzeby samouczka zestaw danych został podzielony według daty na dwie sekcje na potrzeby trenowania i przewidywania. Jest to symulacja danych historycznych i nowych danych przesyłanych strumieniowo.

    features_cols = ['AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY']
    cutoff_date = pd.to_datetime('2023-01-01')
    
    train_df = df[df.Date < cutoff_date]
    print(train_df.shape)
    train_df[:3]
    
    train_len = len(train_df)
    predict_len = len(df) - train_len
    print(f'Total samples: {len(df)}. Split to {train_len} for training, {predict_len} for testing')
    
  5. Uruchom komórki, aby wytrenować model i zapisać go w rejestrze modeli MLflow sieci szkieletowej.

    import mlflow
    from anomaly_detector import MultivariateAnomalyDetector
    model = MultivariateAnomalyDetector()
    
    sliding_window = 200
    param   s = {"sliding_window": sliding_window}
    
    model.fit(train_df, params=params)
    
    with mlflow.start_run():
        mlflow.log_params(params)
        mlflow.set_tag("Training Info", "MVAD on 5 Stocks Dataset")
    
        model_info = mlflow.pyfunc.log_model(
            python_model=model,
            artifact_path="mvad_artifacts",
            registered_model_name="mvad_5_stocks_model",
        )
    
  6. Uruchom następującą komórkę, aby wyodrębnić zarejestrowaną ścieżkę modelu do użycia do przewidywania przy użyciu piaskownicy języka Python Kusto.

    mi = mlflow.search_registered_models(filter_string="name='mvad_5_stocks_model'")[0]
    model_abfss = mi.latest_versions[0].source
    print(model_abfss)
    
  7. Skopiuj identyfikator URI modelu z ostatnich danych wyjściowych komórki do użycia w późniejszym kroku.

Część 8. Konfigurowanie zestawu zapytań KQL

Aby uzyskać ogólne informacje, zobacz Create a KQL queryset (Tworzenie zestawu zapytań KQL).

  1. W obszarze roboczym wybierz pozycję +Nowy element>zestawu zapytań KQL.
  2. Wprowadź nazwę MultivariateAnomalyDetectionTutorial, a następnie wybierz pozycję Utwórz.
  3. W oknie centrum danych OneLake wybierz bazę danych KQL, w której są przechowywane dane.
  4. Wybierz pozycję Połącz.

Część 9. Przewidywanie anomalii w zestawie zapytań KQL

  1. Uruchom następujące zapytanie ".create-or-alter function", aby zdefiniować funkcję przechowywaną predict_fabric_mvad_fl():

    .create-or-alter function with (folder = "Packages\\ML", docstring = "Predict MVAD model in Microsoft Fabric")
    predict_fabric_mvad_fl(samples:(*), features_cols:dynamic, artifacts_uri:string, trim_result:bool=false)
    {
        let s = artifacts_uri;
        let artifacts = bag_pack('MLmodel', strcat(s, '/MLmodel;impersonate'), 'conda.yaml', strcat(s, '/conda.yaml;impersonate'),
                                 'requirements.txt', strcat(s, '/requirements.txt;impersonate'), 'python_env.yaml', strcat(s, '/python_env.yaml;impersonate'),
                                 'python_model.pkl', strcat(s, '/python_model.pkl;impersonate'));
        let kwargs = bag_pack('features_cols', features_cols, 'trim_result', trim_result);
        let code = ```if 1:
            import os
            import shutil
            import mlflow
            model_dir = 'C:/Temp/mvad_model'
            model_data_dir = model_dir + '/data'
            os.mkdir(model_dir)
            shutil.move('C:/Temp/MLmodel', model_dir)
            shutil.move('C:/Temp/conda.yaml', model_dir)
            shutil.move('C:/Temp/requirements.txt', model_dir)
            shutil.move('C:/Temp/python_env.yaml', model_dir)
            shutil.move('C:/Temp/python_model.pkl', model_dir)
            features_cols = kargs["features_cols"]
            trim_result = kargs["trim_result"]
            test_data = df[features_cols]
            model = mlflow.pyfunc.load_model(model_dir)
            predictions = model.predict(test_data)
            predict_result = pd.DataFrame(predictions)
            samples_offset = len(df) - len(predict_result)        # this model doesn't output predictions for the first sliding_window-1 samples
            if trim_result:                                       # trim the prefix samples
                result = df[samples_offset:]
                result.iloc[:,-4:] = predict_result.iloc[:, 1:]   # no need to copy 1st column which is the timestamp index
            else:
                result = df                                       # output all samples
                result.iloc[samples_offset:,-4:] = predict_result.iloc[:, 1:]
            ```;
        samples
        | evaluate python(typeof(*), code, kwargs, external_artifacts=artifacts)
    }
    
  2. Uruchom następujące zapytanie przewidywania, zastępując identyfikator URI modelu wyjściowego skopiowaną na końcu kroku 7.

    Zapytanie wykrywa anomalie wielowymiarowe w pięciu akcjach na podstawie wytrenowanego modelu i renderuje wyniki jako anomalychart. Nietypowe punkty są renderowane na pierwszym magazynie (AAPL), choć reprezentują anomalie wielowariancji (innymi słowy, anomalie wspólnych zmian pięciu zapasów w określonej dacie).

    let cutoff_date=datetime(2023-01-01);
    let num_predictions=toscalar(demo_stocks_change | where Date >= cutoff_date | count);   //  number of latest points to predict
    let sliding_window=200;                                                                 //  should match the window that was set for model training
    let prefix_score_len = sliding_window/2+min_of(sliding_window/2, 200)-1;
    let num_samples = prefix_score_len + num_predictions;
    demo_stocks_change
    | top num_samples by Date desc
    | order by Date asc
    | extend is_anomaly=bool(false), score=real(null), severity=real(null), interpretation=dynamic(null)
    | invoke predict_fabric_mvad_fl(pack_array('AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY'),
                // NOTE: Update artifacts_uri to model path
                artifacts_uri='enter your model URI here',
                trim_result=true)
    | summarize Date=make_list(Date), AAPL=make_list(AAPL), AMZN=make_list(AMZN), GOOG=make_list(GOOG), MSFT=make_list(MSFT), SPY=make_list(SPY), anomaly=make_list(toint(is_anomaly))
    | render anomalychart with(anomalycolumns=anomaly, title='Stock Price Changest in % with Anomalies')
    

Wynikowy wykres anomalii powinien wyglądać jak na poniższej ilustracji:

zrzut ekranu przedstawiający wielowymiarowe wyniki anomalii.

Czyszczenie zasobów

Po ukończeniu samouczka możesz usunąć utworzone zasoby, aby uniknąć ponoszenia innych kosztów. Aby usunąć zasoby, wykonaj następujące kroki:

  1. Przejdź do strony głównej obszaru roboczego.
  2. Usuń środowisko utworzone w tym samouczku.
  3. Usuń notes utworzony w tym samouczku.
  4. Usuń bazę danych lub magazyn zdarzeń używany w tym samouczku.
  5. Usuń zestaw zapytań KQL utworzony w tym samouczku.