次の方法で共有


多変量異常検出

リアルタイム インテリジェンスでの多変量異常検出の一般的な情報については、「Microsoft Fabric での多変量異常検出 - 概要」を参照してください。 このチュートリアルでは、サンプル データを使用して、Python ノートブックの Spark エンジンを使用して多変量異常検出モデルをトレーニングします。 その後、Eventhouse エンジンを使用してトレーニング済みのモデルを新しいデータに適用することで、異常を予測します。 最初のいくつかの手順で環境を設定し、次の手順でモデルをトレーニングし、異常を予測します。

前提条件

パート 1- OneLake の可用性を有効にする

Eventhouse でデータを取得する前に、OneLake の可用性を 有効にする必要があります。 この手順は、取り込んだデータが OneLake で使用できるようになるため、重要です。 後の手順では、Spark Notebook からこの同じデータにアクセスしてモデルをトレーニングします。

  1. Real Time Analytics でワークスペースのホーム ページを参照します。

  2. 「前提条件」で作成した Eventhouse を選択します。 データの保存先となるデータストアを選択します。

  3. [データベースの詳細]タイルの [OneLake の可用性] の横にある 鉛筆アイコンを選択します

  4. 右側のウィンドウで、ボタンを [有効化] に切り替えます。

  5. 完了 を選択します。

    Eventhouse で OneLake の可用性を有効にするスクリーンショット。

パート 2- KQL Python プラグインを有効にする

この手順では、Eventhouse で Python プラグインを有効にします。 この手順は、KQL クエリセットで異常の予測 Python コードを実行するために必要です。 time-series-anomaly-detector パッケージを含む適切なパッケージを選択することが重要です。

  1. Eventhouse 画面でデータベースを選択し、リボンから [管理]>[プラグイン] を選択します。

  2. [プラグイン] ウィンドウで、Python 言語拡張機能[オン] に切り替えます。

  3. Python 3.11.7 DL (プレビュー)を選択します。

  4. 完了 を選択します。

    Eventhouse で Python パッケージ 3.11.7 DL を有効にする方法のスクリーンショット。

パート 3- Spark 環境を作成する

この手順では、Spark エンジンを使用して多変量異常検出モデルをトレーニングする Python ノートブックを実行する Spark 環境を作成します。 環境を作成する方法について詳しくは、「環境の作成と管理」に関するページを参照してください。

  1. エクスペリエンス スイッチャーで、[Data Engineering] を選択します。 既にData Engineering エクスペリエンスを使用している場合は、[ホーム] に移動します。

  2. [作成を推奨される項目] から、[環境] を選択し、環境に MVAD_ENV 名を入力します。

    Data Engineering での環境の作成のスクリーンショット。

  3. ライブラリで、[パブリック ライブラリ]を選択します。

  4. [PyPI から追加] を選択します。

  5. 検索ボックスに、「time-series-anomaly-detector」と入力します。 バージョンには、最新バージョンが自動的に設定されます。 このチュートリアルは、Kusto Python 3.11.7 DL に含まれるバージョンであるバージョン 0.2.7 を使用して作成されました。

  6. [保存] を選択します。

    PyPI パッケージを Spark 環境に追加するスクリーンショット。

  7. 環境内の [ホーム] タブを選択します。

  8. リボンから、[発行] アイコンを選択します。

  9. すべて公開を選択します。 この手順は、完了するまでに数分かかることがあります。

    環境の公開のスクリーンショット。

パート 4- Eventhouse にデータを取り込む

  1. データを格納する KQL データベースにカーソルを合わせます。 その他のメニュー [...]>[データの取得]>[ローカル ファイル] を選択します。

    ローカル ファイルからデータを取得するスクリーンショット。

  2. [+ 新しいテーブル] を選択しテーブル名に「demo_stocks_change」と入力します。

  3. [データのアップロード] ダイアログで、[ファイルの参照] を選択し前提条件でダウンロードしたサンプル データ ファイルをアップロードします

  4. [次へ] を選択します。

  5. [データの検査] セクションで、[最初の行が列ヘッダー][オン] に切り替えます。

  6. 完了 を選択します。

  7. データをアップロードしたら、[閉じる] を選択します。

パート 5- OneLake パスをテーブルにコピーする

demo_stocks_changeテーブルを選択してください。 [テーブルの詳細] タイルで、[パスのコピー] を選択して OneLake パスをクリップボードにコピーします。 このコピーしたテキストをテキスト エディターのどこかに保存して、後の手順で使用します。

OneLake パスのコピーのスクリーンショット。

パート 6- ノートブックを準備する

  1. エクスペリエンス スイッチャーで [開発] を選択し、ワークスペースを選択します。

  2. [インポート][ノートブック] を選択し、[このコンピューターから] を選択します。

  3. [アップロード] を選択し、前提条件でダウンロードしたノートブックを選択します。

  4. ノートブックがアップロードされたら、ワークスペースからノートブックを見つけて開くことができます。

  5. 上部のリボンで、[ワークスペースのデフォルト] ドロップダウンを選択し、前の手順で作成した環境を選択します。

    ノートブックで環境を選択するスクリーンショット。

パート 7- ノートブックを実行する

  1. 標準パッケージをインポートします。

    import numpy as np
    import pandas as pd
    
  2. Spark では OneLake ストレージに安全に接続するために ABFSS URI が必要であるため、次の手順では、OneLake URI を ABFSS URI に変換するためにこの関数を定義します。

    def convert_onelake_to_abfss(onelake_uri):
    if not onelake_uri.startswith('https://'):
        raise ValueError("Invalid OneLake URI. It should start with 'https://'.")
    uri_without_scheme = onelake_uri[8:]
    parts = uri_without_scheme.split('/')
    if len(parts) < 3:
        raise ValueError("Invalid OneLake URI format.")
    account_name = parts[0].split('.')[0]
    container_name = parts[1]
    path = '/'.join(parts[2:])
    abfss_uri = f"abfss://{container_name}@{parts[0]}/{path}"
    return abfss_uri
    
  3. パート 5- OneLake パスをテーブルにコピーする」 からコピーした OneLake URI を入力し、demo_stocks_change テーブルを pandas データフレームに読み込みます。

    onelake_uri = "OneLakeTableURI" # Replace with your OneLake table URI 
    abfss_uri = convert_onelake_to_abfss(onelake_uri)
    print(abfss_uri)
    
    df = spark.read.format('delta').load(abfss_uri)
    df = df.toPandas().set_index('Date')
    print(df.shape)
    df[:3]
    
  4. 次のセルを実行して、トレーニングデータフレームと予測データフレームを準備します。

    Note

    実際の予測は、「パート 9- Predict-anomalies-in-the-kql-queryset」 で Eventhouse によってデータに対して実行されます。 運用シナリオでは、イベントハウスにデータをストリーミングする場合、新しいストリーミング データに対して予測が行われます。 チュートリアルの目的上、データセットはトレーニングと予測のために日付ごとに 2 つのセクションに分割されています。 これは、履歴データと新しいストリーミング データをシミュレートするためです。

    features_cols = ['AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY']
    cutoff_date = pd.to_datetime('2023-01-01')
    
    train_df = df[df.Date < cutoff_date]
    print(train_df.shape)
    train_df[:3]
    
    train_len = len(train_df)
    predict_len = len(df) - train_len
    print(f'Total samples: {len(df)}. Split to {train_len} for training, {predict_len} for testing')
    
  5. セルを実行してモデルをトレーニングし、Fabric MLflow モデル レジストリに保存します。

    import mlflow
    from anomaly_detector import MultivariateAnomalyDetector
    model = MultivariateAnomalyDetector()
    
    sliding_window = 200
    param   s = {"sliding_window": sliding_window}
    
    model.fit(train_df, params=params)
    
    with mlflow.start_run():
        mlflow.log_params(params)
        mlflow.set_tag("Training Info", "MVAD on 5 Stocks Dataset")
    
        model_info = mlflow.pyfunc.log_model(
            python_model=model,
            artifact_path="mvad_artifacts",
            registered_model_name="mvad_5_stocks_model",
        )
    
    # Extract the registered model path to be used for prediction using Kusto Python sandbox
    
    mi = mlflow.search_registered_models(filter_string="name='mvad_5_stocks_model'")[0]
    model_abfss = mi.latest_versions[0].source
    print(model_abfss)
    
  6. 最後のセル出力からモデル URI をコピーします。 これは、後の手順で使用します。

パート 8- KQL クエリセットを設定する

一般的な情報については、「KQL クエリセットの作成」を参照してください。

  1. エクスペリエンス スイッチャーで、[リアルタイム インテリジェンス] を選択します。
  2. ワークスペースを選択します。
  3. [+ 新規項目]>[KQL クエリセット] を選択します。 名前 MultivariateAnomalyDetectionTutorial を入力します。
  4. [作成] を選択します
  5. OneLake データ ハブ ウィンドウで、データを格納した KQL データベースを選択します。
  6. [接続] を選択します。

パート 9- KQL クエリセットの異常を予測する

  1. 次の '.create-or-alter function' クエリをコピーして貼り付けて実行し、predict_fabric_mvad_fl() ストアド関数を定義します。

    .create-or-alter function with (folder = "Packages\\ML", docstring = "Predict MVAD model in Microsoft Fabric")
    predict_fabric_mvad_fl(samples:(*), features_cols:dynamic, artifacts_uri:string, trim_result:bool=false)
    {
        let s = artifacts_uri;
        let artifacts = bag_pack('MLmodel', strcat(s, '/MLmodel;impersonate'), 'conda.yaml', strcat(s, '/conda.yaml;impersonate'),
                                 'requirements.txt', strcat(s, '/requirements.txt;impersonate'), 'python_env.yaml', strcat(s, '/python_env.yaml;impersonate'),
                                 'python_model.pkl', strcat(s, '/python_model.pkl;impersonate'));
        let kwargs = bag_pack('features_cols', features_cols, 'trim_result', trim_result);
        let code = ```if 1:
            import os
            import shutil
            import mlflow
            model_dir = 'C:/Temp/mvad_model'
            model_data_dir = model_dir + '/data'
            os.mkdir(model_dir)
            shutil.move('C:/Temp/MLmodel', model_dir)
            shutil.move('C:/Temp/conda.yaml', model_dir)
            shutil.move('C:/Temp/requirements.txt', model_dir)
            shutil.move('C:/Temp/python_env.yaml', model_dir)
            shutil.move('C:/Temp/python_model.pkl', model_dir)
            features_cols = kargs["features_cols"]
            trim_result = kargs["trim_result"]
            test_data = df[features_cols]
            model = mlflow.pyfunc.load_model(model_dir)
            predictions = model.predict(test_data)
            predict_result = pd.DataFrame(predictions)
            samples_offset = len(df) - len(predict_result)        # this model doesn't output predictions for the first sliding_window-1 samples
            if trim_result:                                       # trim the prefix samples
                result = df[samples_offset:]
                result.iloc[:,-4:] = predict_result.iloc[:, 1:]   # no need to copy 1st column which is the timestamp index
            else:
                result = df                                       # output all samples
                result.iloc[samples_offset:,-4:] = predict_result.iloc[:, 1:]
            ```;
        samples
        | evaluate python(typeof(*), code, kwargs, external_artifacts=artifacts)
    }
    
  2. 次の予測クエリをコピーして貼り付けます。

    1. 手順 7 の最後にコピーした出力モデル URI を置き換えます。
    2. クエリを実行します。 トレーニング済みのモデルに基づいて 5 つのストックの多変量異常を検出し、結果を anomalychart としてレンダリングします。 異常なポイントは最初のストック (AAPL) にレンダリングされますが、多変量の異常 (つまり、特定の日付の 5 つのストックの共同変更の異常) を表します。
    let cutoff_date=datetime(2023-01-01);
    let num_predictions=toscalar(demo_stocks_change | where Date >= cutoff_date | count);   //  number of latest points to predict
    let sliding_window=200;                                                                 //  should match the window that was set for model training
    let prefix_score_len = sliding_window/2+min_of(sliding_window/2, 200)-1;
    let num_samples = prefix_score_len + num_predictions;
    demo_stocks_change
    | top num_samples by Date desc 
    | order by Date asc
    | extend is_anomaly=bool(false), score=real(null), severity=real(null), interpretation=dynamic(null)
    | invoke predict_fabric_mvad_fl(pack_array('AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY'),
                // NOTE: Update artifacts_uri to model path
                artifacts_uri='enter your model URI here',
                trim_result=true)
    | summarize Date=make_list(Date), AAPL=make_list(AAPL), AMZN=make_list(AMZN), GOOG=make_list(GOOG), MSFT=make_list(MSFT), SPY=make_list(SPY), anomaly=make_list(toint(is_anomaly))
    | render anomalychart with(anomalycolumns=anomaly, title='Stock Price Changest in % with Anomalies')
    

結果の異常グラフは次の図のようになります。

多変量異常出力のスクリーンショット。

リソースをクリーンアップする

チュートリアルが完了したら、作成したリソースを削除して、他のコストが発生しないようにすることができます。 リソースを削除するには、次の手順を実行します。

  1. ワークスペースのホームページを参照します。
  2. このチュートリアルで作成した環境を削除します。
  3. このチュートリアルで作成したノートブックを削除します。
  4. このチュートリアルで使用した Eventhouse または database を削除します。
  5. このチュートリアルで作成した KQL クエリセットを削除します。