Partilhar via


Deteção de anomalias multivariadas

Para obter informações gerais sobre a deteção de anomalias multivariadas no Real-Time Intelligence, consulte Deteção de anomalias multivariadas no Microsoft Fabric - visão geral. Neste tutorial, você usará dados de exemplo para treinar um modelo de deteção de anomalias multivariado usando o mecanismo Spark em um notebook Python. Em seguida, você preverá anomalias aplicando o modelo treinado a novos dados usando o mecanismo Eventhouse. As primeiras etapas configuram seus ambientes e as etapas seguintes treinam o modelo e preveem anomalias.

Pré-requisitos

Parte 1- Ativar a disponibilidade do OneLake

A disponibilidade do OneLake deve ser habilitada antes de obter dados na Eventhouse. Esta etapa é importante, pois permite que os dados que você ingere fiquem disponíveis no OneLake. Em uma etapa posterior, você acessa esses mesmos dados do seu Spark Notebook para treinar o modelo.

  1. Navegue até a página inicial do seu espaço de trabalho em Inteligência em Tempo Real.

  2. Selecione a Eventhouse que você criou nos pré-requisitos. Escolha a base de dados onde pretende armazenar os seus dados.

  3. No bloco Detalhes do banco de dados, selecione o ícone de lápis ao lado de Disponibilidade do OneLake

  4. No painel direito, alterne o botão para Ativo.

  5. Selecionar Concluído.

    Captura de ecrã a mostrar como ativar a disponibilidade do OneLake no seu Eventhouse.

Parte 2- Ativar o plugin KQL Python

Nesta etapa, você ativa o plugin python em sua Eventhouse. Esta etapa é necessária para executar o código Python de previsão de anomalias no conjunto de consultas KQL. É importante escolher o pacote correto que contém o pacote time-series-anomaly-detector .

  1. No ecrã Eventhouse, selecione a sua base de dados e, em seguida, selecione Gerir>Plugins no friso..

  2. No painel Plug-ins, alterne a extensão da linguagem Python para Ativado.

  3. Selecione Python 3.11.7 DL (visualização).

  4. Selecionar Concluído.

    Captura de tela para como habilitar o pacote python 3.11.7 DL na Eventhouse.

Parte 3- Criar um ambiente Spark

Nesta etapa, você cria um ambiente Spark para executar o notebook Python que treina o modelo de deteção de anomalias multivariadas usando o mecanismo Spark. Para obter mais informações sobre como criar ambientes, consulte Criar e gerenciar ambientes.

  1. No seletor de experiências, escolha Engenharia de Dados. Se você já estiver na experiência de Engenharia de Dados, navegue até Página inicial.

  2. Em Itens recomendados para criar, selecione Ambientes e insira o nome MVAD_ENV para o ambiente.

    Captura de tela da criação de um ambiente em Engenharia de Dados.

  3. Em Bibliotecas, selecione Bibliotecas públicas.

  4. Selecione Adicionar do PyPI.

  5. Na caixa de pesquisa, digite time-series-anomaly-detector. A versão é preenchida automaticamente com a versão mais recente. Este tutorial foi criado usando a versão 0.2.7, que é a versão incluída no Kusto Python 3.11.7 DL.

  6. Selecione Guardar.

    Captura de tela da adição do pacote PyPI ao ambiente Spark.

  7. Selecione a guia Página Inicial no ambiente.

  8. Selecione o ícone Publicar na faixa de opções.

  9. Selecione Publicar tudo. Esta etapa pode levar vários minutos para ser concluída.

    Captura de tela da publicação do ambiente.

Parte 4- Obter dados para a Eventhouse

  1. Passe o cursor sobre o banco de dados KQL onde você deseja armazenar seus dados. Selecione o menu Mais [...]>Obter arquivo local de dados>.

    Captura de tela de obter dados do arquivo local.

  2. Selecione + Nova tabela e digite demo_stocks_change como o nome da tabela.

  3. Na caixa de diálogo Carregar dados, selecione Procurar arquivos e carregue o arquivo de dados de exemplo que foi baixado nos Pré-requisitos

  4. Selecione Seguinte.

  5. Na seção Inspecionar os dados, alterne Primeira linha é o cabeçalho da coluna para Ativado.

  6. Selecione Concluir.

  7. Quando os dados forem carregados, selecione Fechar.

Parte 5- Copiar o caminho do OneLake para a tabela

Certifique-se de selecionar a tabela demo_stocks_change . No bloco Detalhes da tabela, selecione Copiar caminho para copiar o caminho do OneLake para a área de transferência. Salve esse texto copiado em um editor de texto em algum lugar para ser usado em uma etapa posterior.

Captura de tela da cópia do caminho OneLake.

Parte 6- Preparar o bloco de notas

  1. No seletor de experiências, escolha Desenvolver e selecione seu espaço de trabalho.

  2. Selecione Importar, Bloco de Anotações e, em seguida , Desse computador.

  3. Selecione Carregar e escolha o bloco de anotações que você baixou nos pré-requisitos.

  4. Depois de carregar o bloco de notas, pode localizar e abrir o bloco de notas a partir da sua área de trabalho.

  5. Na faixa de opções superior, selecione a lista suspensa Padrão do espaço de trabalho e selecione o ambiente criado na etapa anterior.

    Captura de tela mostrando a seleção do ambiente no bloco de anotações.

Parte 7- Executar o bloco de notas

  1. Importar pacotes padrão.

    import numpy as np
    import pandas as pd
    
  2. O Spark precisa de um URI ABFSS para se conectar com segurança ao armazenamento do OneLake, portanto, a próxima etapa define essa função para converter o URI do OneLake em URI ABFSS.

    def convert_onelake_to_abfss(onelake_uri):
    if not onelake_uri.startswith('https://'):
        raise ValueError("Invalid OneLake URI. It should start with 'https://'.")
    uri_without_scheme = onelake_uri[8:]
    parts = uri_without_scheme.split('/')
    if len(parts) < 3:
        raise ValueError("Invalid OneLake URI format.")
    account_name = parts[0].split('.')[0]
    container_name = parts[1]
    path = '/'.join(parts[2:])
    abfss_uri = f"abfss://{container_name}@{parts[0]}/{path}"
    return abfss_uri
    
  3. Insira seu URI do OneLake copiado da Parte 5- Copie o caminho do OneLake para a tabela para carregar demo_stocks_change tabela em um dataframe pandas.

    onelake_uri = "OneLakeTableURI" # Replace with your OneLake table URI 
    abfss_uri = convert_onelake_to_abfss(onelake_uri)
    print(abfss_uri)
    
    df = spark.read.format('delta').load(abfss_uri)
    df = df.toPandas().set_index('Date')
    print(df.shape)
    df[:3]
    
  4. Execute as células a seguir para preparar os dataframes de treinamento e previsão.

    Nota

    As previsões reais serão executadas em dados pela Eventhouse na parte 9- Predict-anomalies-in-the-kql-queryset. Em um cenário de produção, se você estivesse transmitindo dados para a casa de eventos, as previsões seriam feitas nos novos dados de streaming. Para o propósito do tutorial, o conjunto de dados foi dividido por data em duas seções para treinamento e previsão. Isso é para simular dados históricos e novos dados de streaming.

    features_cols = ['AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY']
    cutoff_date = pd.to_datetime('2023-01-01')
    
    train_df = df[df.Date < cutoff_date]
    print(train_df.shape)
    train_df[:3]
    
    train_len = len(train_df)
    predict_len = len(df) - train_len
    print(f'Total samples: {len(df)}. Split to {train_len} for training, {predict_len} for testing')
    
  5. Execute as células para treinar o modelo e salvá-lo no registro de modelos Fabric MLflow.

    import mlflow
    from anomaly_detector import MultivariateAnomalyDetector
    model = MultivariateAnomalyDetector()
    
    sliding_window = 200
    param   s = {"sliding_window": sliding_window}
    
    model.fit(train_df, params=params)
    
    with mlflow.start_run():
        mlflow.log_params(params)
        mlflow.set_tag("Training Info", "MVAD on 5 Stocks Dataset")
    
        model_info = mlflow.pyfunc.log_model(
            python_model=model,
            artifact_path="mvad_artifacts",
            registered_model_name="mvad_5_stocks_model",
        )
    
    # Extract the registered model path to be used for prediction using Kusto Python sandbox
    
    mi = mlflow.search_registered_models(filter_string="name='mvad_5_stocks_model'")[0]
    model_abfss = mi.latest_versions[0].source
    print(model_abfss)
    
  6. Copie o URI do modelo da saída da última célula. Você usará isso em uma próxima etapa posterior.

Parte 8- Configurar o conjunto de consultas KQL

Para obter informações gerais, consulte Criar um conjunto de consultas KQL.

  1. No seletor de experiências, escolha Inteligência em Tempo Real.
  2. Selecione a área de trabalho.
  3. Selecione +Novo item>KQL Queryset. Digite o nome MultivariateAnomalyDetectionTutorial.
  4. Selecione Criar.
  5. Na janela do hub de dados OneLake, selecione o banco de dados KQL onde você armazenou os dados.
  6. Selecione Ligar.

Parte 9- Prever anomalias no conjunto de consultas KQL

  1. Copie/cole e execute a seguinte consulta '.create-or-alter function' para definir a predict_fabric_mvad_fl() função armazenada:

    .create-or-alter function with (folder = "Packages\\ML", docstring = "Predict MVAD model in Microsoft Fabric")
    predict_fabric_mvad_fl(samples:(*), features_cols:dynamic, artifacts_uri:string, trim_result:bool=false)
    {
        let s = artifacts_uri;
        let artifacts = bag_pack('MLmodel', strcat(s, '/MLmodel;impersonate'), 'conda.yaml', strcat(s, '/conda.yaml;impersonate'),
                                 'requirements.txt', strcat(s, '/requirements.txt;impersonate'), 'python_env.yaml', strcat(s, '/python_env.yaml;impersonate'),
                                 'python_model.pkl', strcat(s, '/python_model.pkl;impersonate'));
        let kwargs = bag_pack('features_cols', features_cols, 'trim_result', trim_result);
        let code = ```if 1:
            import os
            import shutil
            import mlflow
            model_dir = 'C:/Temp/mvad_model'
            model_data_dir = model_dir + '/data'
            os.mkdir(model_dir)
            shutil.move('C:/Temp/MLmodel', model_dir)
            shutil.move('C:/Temp/conda.yaml', model_dir)
            shutil.move('C:/Temp/requirements.txt', model_dir)
            shutil.move('C:/Temp/python_env.yaml', model_dir)
            shutil.move('C:/Temp/python_model.pkl', model_dir)
            features_cols = kargs["features_cols"]
            trim_result = kargs["trim_result"]
            test_data = df[features_cols]
            model = mlflow.pyfunc.load_model(model_dir)
            predictions = model.predict(test_data)
            predict_result = pd.DataFrame(predictions)
            samples_offset = len(df) - len(predict_result)        # this model doesn't output predictions for the first sliding_window-1 samples
            if trim_result:                                       # trim the prefix samples
                result = df[samples_offset:]
                result.iloc[:,-4:] = predict_result.iloc[:, 1:]   # no need to copy 1st column which is the timestamp index
            else:
                result = df                                       # output all samples
                result.iloc[samples_offset:,-4:] = predict_result.iloc[:, 1:]
            ```;
        samples
        | evaluate python(typeof(*), code, kwargs, external_artifacts=artifacts)
    }
    
  2. Copie/cole a seguinte consulta de previsão.

    1. Substitua o URI do modelo de saída copiado no final da etapa 7.
    2. Executar a consulta. Ele deteta anomalias multivariadas nas cinco ações, com base no modelo treinado, e processa os resultados como anomalychart. Os pontos anômalos são renderizados na primeira unidade populacional (AAPL), embora representem anomalias multivariadas (por outras palavras, anomalias das variações conjuntas das cinco unidades populacionais na data específica).
    let cutoff_date=datetime(2023-01-01);
    let num_predictions=toscalar(demo_stocks_change | where Date >= cutoff_date | count);   //  number of latest points to predict
    let sliding_window=200;                                                                 //  should match the window that was set for model training
    let prefix_score_len = sliding_window/2+min_of(sliding_window/2, 200)-1;
    let num_samples = prefix_score_len + num_predictions;
    demo_stocks_change
    | top num_samples by Date desc 
    | order by Date asc
    | extend is_anomaly=bool(false), score=real(null), severity=real(null), interpretation=dynamic(null)
    | invoke predict_fabric_mvad_fl(pack_array('AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY'),
                // NOTE: Update artifacts_uri to model path
                artifacts_uri='enter your model URI here',
                trim_result=true)
    | summarize Date=make_list(Date), AAPL=make_list(AAPL), AMZN=make_list(AMZN), GOOG=make_list(GOOG), MSFT=make_list(MSFT), SPY=make_list(SPY), anomaly=make_list(toint(is_anomaly))
    | render anomalychart with(anomalycolumns=anomaly, title='Stock Price Changest in % with Anomalies')
    

O gráfico de anomalias resultante deve ser semelhante à seguinte imagem:

Captura de tela da saída de anomalia multivariada.

Clean up resources (Limpar recursos)

Ao terminar o tutorial, você pode excluir os recursos criados para evitar incorrer em outros custos. Para eliminar os recursos, siga estes passos:

  1. Navegue até a página inicial do seu espaço de trabalho.
  2. Exclua o ambiente criado neste tutorial.
  3. Exclua o bloco de anotações criado neste tutorial.
  4. Exclua a Eventhouse ou o banco de dados usado neste tutorial.
  5. Exclua o conjunto de consultas KQL criado neste tutorial.