Compartilhar via


Detecção de Anomalias Multivariadas

Para obter informações gerais sobre a detecção de anomalias multivariadas na Inteligência em Tempo Real, confira Detecção de anomalias multivariadas no Microsoft Fabric - visão geral. Neste tutorial, você usará dados de exemplo para treinar um modelo de detecção de anomalias multivariadas usando o mecanismo Spark em um notebook do Python. Em seguida, você irá prever anomalias aplicando o modelo treinado a novos dados usando o mecanismo Eventhouse. As primeiras etapas configuram os ambientes e as etapas seguintes treinam o modelo e preveem as anomalias.

Pré-requisitos

Parte 1 - Habilitar a disponibilidade do OneLake

A disponibilidade do OneLake deve ser habilitada antes de obter dados no Eventhouse. Essa etapa é importante, pois permite a disponibilização dos dados ingeridos no OneLake. Em uma etapa posterior, esses mesmos dados serão acessados do Notebook do Spark para treinar o modelo.

  1. Navegue até a home page do espaço de trabalho na Inteligência em tempo Real.

  2. Selecione o Eventhouse que você criou nos pré-requisitos. Escolha o banco de dados para o qual deseja carregar os dados.

  3. Na seção Detalhes do banco de dados, selecione o ícone de lápis ao lado deDisponibilidade do OneLake

  4. No painel direito, alterne o botão para Ativo.

  5. Selecione Concluído.

    Captura de tela da ativação da disponibilidade do OneLake no Eventhouse.

Parte 2 - Ativar o plug-in KQL Python

Nesta etapa, você habilita o plug-in do python no Eventhouse. Esta etapa é necessária para executar o código Python de previsão anomalias no conjunto de consultas KQL. É importante escolher o pacote correto que contém o pacote time-series-anomaly-detector.

  1. Na tela Eventhouse, selecione o banco de dados e, em seguida, selecione Gerenciar>Plug-ins na faixa de opções.

  2. No painel Plug-ins, alterne a extensão de linguagem Python para Ativada.

  3. Selecione Python 3.11.7 DL (versão prévia).

  4. Selecione Concluído.

    Captura de tela de como habilitar o pacote Python 3.11.7 DL no Eventhouse.

Parte 3 - Criar um ambiente do Spark

Nesta etapa, você cria um ambiente do Spark para executar o notebook do Python que treina o modelo de detecção de anomalias multivariadas usando o mecanismo do Spark. Para obter mais informações sobre a criação de ambientes, confira Criar e gerenciar ambientes.

  1. No comutador de experiência, escolha Engenharia de Dados. Se você já estiver na experiência de Engenharia de Dados, navegue até Início.

  2. Em Itens recomendados para criar, selecione Ambientes e insira MVAD_ENV como nome do ambiente.

    Captura de tela da criação de um ambiente em Engenharia de Dados.

  3. Em Bibliotecas, selecione Bibliotecas públicas.

  4. Selecione Adicionar do PyPI.

  5. Na caixa de pesquisa, digite time-series-anomaly-detector. A versão mais recente é preenchida automaticamente. Este tutorial foi criado usando a versão 0.2.7, incluída no Kusto Python 3.11.7 DL.

  6. Selecione Salvar.

    Captura de tela da adição do pacote PyPI ao ambiente do Spark.

  7. Selecione a guia Início no ambiente.

  8. Na faixa de opções, selecione o ícone Publicar.

  9. Selecione Publicar tudo. Essa etapa pode levar vários minutos para ser concluída.

    Captura de tela da publicação do ambiente.

Parte 4 - Obter dados para o Eventhouse

  1. Passe o mouse sobre o banco de dados no qual deseja armazenar os dados. Selecione o menu Mais [...]>Obter dados>Arquivo local.

    Captura de tela de obter dados do arquivo local.

  2. Selecione + Nova tabela e digite demo_stocks_change como o nome da tabela.

  3. Na caixa de diálogo de upload de dados, selecione Procurar arquivos e carregue o arquivo de dados de exemplo baixado nos Pré-requisitos

  4. Selecione Avançar.

  5. Na seção Inspecionar os dados, alterne A primeira linha é o cabeçalho da coluna para Ativado.

  6. Selecione Concluir.

  7. Após a conclusão do carregamento, selecione Fechar.

Parte 5 - Copiar o caminho do OneLake para a tabela

Selecione a tabela demo_stocks_change. No bloco Detalhes da tabela, selecione Copiar caminho para copiar o caminho do OneLake para a área de transferência. Salve o texto copiado em um editor de texto em algum local usar em uma etapa posterior.

Captura de tela da cópia do caminho do OneLake.

Parte 6- Preparar o notebook

  1. No seletor de experiências, escolha Desenvolver e selecione seu espaço de trabalho.

  2. Selecione Importar, Notebook e, em seguida Deste computador.

  3. Selecione Upload e escolha o upload baixado nos pré-requisitos.

  4. Depois que o notebook for carregado, você poderá encontrar e abrir seu notebook no espaço de trabalho.

  5. Na faixa de opções superior, selecione a lista suspensaPadrão do workspace e selecione o ambiente que criou na etapa anterior.

    Captura de tela da seleção do ambiente no notebook.

Parte 7- Executar o notebook

  1. Importe os pacotes padrão.

    import numpy as np
    import pandas as pd
    
  2. O Spark precisa de um URI ABFSS para se conectar com segurança ao armazenamento do OneLake. Portanto, a próxima etapa define essa função para converter o URI do OneLake em URI ABFSS.

    def convert_onelake_to_abfss(onelake_uri):
    if not onelake_uri.startswith('https://'):
        raise ValueError("Invalid OneLake URI. It should start with 'https://'.")
    uri_without_scheme = onelake_uri[8:]
    parts = uri_without_scheme.split('/')
    if len(parts) < 3:
        raise ValueError("Invalid OneLake URI format.")
    account_name = parts[0].split('.')[0]
    container_name = parts[1]
    path = '/'.join(parts[2:])
    abfss_uri = f"abfss://{container_name}@{parts[0]}/{path}"
    return abfss_uri
    
  3. Insira o URI do OneLake copiado em Parte 5 - Copiar caminho do OneLake para a tabela para carregar a tabela demo_stocks_change em um dataframe do Pandas.

    onelake_uri = "OneLakeTableURI" # Replace with your OneLake table URI 
    abfss_uri = convert_onelake_to_abfss(onelake_uri)
    print(abfss_uri)
    
    df = spark.read.format('delta').load(abfss_uri)
    df = df.toPandas().set_index('Date')
    print(df.shape)
    df[:3]
    
  4. Execute as células a seguir para preparar os dataframes de treinamento e previsão.

    Observação

    As previsões reais serão executadas em dados pelo Eventhouse em part 9 - Predict-anomalies-in-the-kql-queryset. Em um cenário de produção, se você estivesse transmitindo dados para o eventhouse, as previsões seriam feitas nos novos dados de streaming. Para fins do tutorial, o conjunto de dados foi dividido por data em duas seções para treinamento e previsão. Isso é feito para simular dados históricos e novos dados de streaming.

    features_cols = ['AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY']
    cutoff_date = pd.to_datetime('2023-01-01')
    
    train_df = df[df.Date < cutoff_date]
    print(train_df.shape)
    train_df[:3]
    
    train_len = len(train_df)
    predict_len = len(df) - train_len
    print(f'Total samples: {len(df)}. Split to {train_len} for training, {predict_len} for testing')
    
  5. Execute as células para treinar o modelo e salvá-lo no registro de modelos de MLflow do Fabric.

    import mlflow
    from anomaly_detector import MultivariateAnomalyDetector
    model = MultivariateAnomalyDetector()
    
    sliding_window = 200
    param   s = {"sliding_window": sliding_window}
    
    model.fit(train_df, params=params)
    
    with mlflow.start_run():
        mlflow.log_params(params)
        mlflow.set_tag("Training Info", "MVAD on 5 Stocks Dataset")
    
        model_info = mlflow.pyfunc.log_model(
            python_model=model,
            artifact_path="mvad_artifacts",
            registered_model_name="mvad_5_stocks_model",
        )
    
    # Extract the registered model path to be used for prediction using Kusto Python sandbox
    
    mi = mlflow.search_registered_models(filter_string="name='mvad_5_stocks_model'")[0]
    model_abfss = mi.latest_versions[0].source
    print(model_abfss)
    
  6. Copie o URI do modelo da última saída da célula. Você usará esse URI em uma etapa posterior.

Parte 8 - Configurar o conjunto de consultas KQL

Para obter informações gerais, confira Criar um conjunto de consultas KQL.

  1. No seletor de experiências, escolha Inteligência em Tempo Real.
  2. Selecione o workspace.
  3. Selecione +Novo item>Conjunto de Consultas KQL. Digite o nome MultivariateAnomalyDetectionTutorial.
  4. Selecione Criar.
  5. Na janela do hub de dados do OneLake, selecione o banco de dados KQL em que você armazenou os dados.
  6. Selecione Conectar.

Parte 9 - Prever anomalias no conjunto de consultas KQL

  1. Copie/cole e execute a seguinte consulta .create-or-alter function para definir a função armazenada predict_fabric_mvad_fl():

    .create-or-alter function with (folder = "Packages\\ML", docstring = "Predict MVAD model in Microsoft Fabric")
    predict_fabric_mvad_fl(samples:(*), features_cols:dynamic, artifacts_uri:string, trim_result:bool=false)
    {
        let s = artifacts_uri;
        let artifacts = bag_pack('MLmodel', strcat(s, '/MLmodel;impersonate'), 'conda.yaml', strcat(s, '/conda.yaml;impersonate'),
                                 'requirements.txt', strcat(s, '/requirements.txt;impersonate'), 'python_env.yaml', strcat(s, '/python_env.yaml;impersonate'),
                                 'python_model.pkl', strcat(s, '/python_model.pkl;impersonate'));
        let kwargs = bag_pack('features_cols', features_cols, 'trim_result', trim_result);
        let code = ```if 1:
            import os
            import shutil
            import mlflow
            model_dir = 'C:/Temp/mvad_model'
            model_data_dir = model_dir + '/data'
            os.mkdir(model_dir)
            shutil.move('C:/Temp/MLmodel', model_dir)
            shutil.move('C:/Temp/conda.yaml', model_dir)
            shutil.move('C:/Temp/requirements.txt', model_dir)
            shutil.move('C:/Temp/python_env.yaml', model_dir)
            shutil.move('C:/Temp/python_model.pkl', model_dir)
            features_cols = kargs["features_cols"]
            trim_result = kargs["trim_result"]
            test_data = df[features_cols]
            model = mlflow.pyfunc.load_model(model_dir)
            predictions = model.predict(test_data)
            predict_result = pd.DataFrame(predictions)
            samples_offset = len(df) - len(predict_result)        # this model doesn't output predictions for the first sliding_window-1 samples
            if trim_result:                                       # trim the prefix samples
                result = df[samples_offset:]
                result.iloc[:,-4:] = predict_result.iloc[:, 1:]   # no need to copy 1st column which is the timestamp index
            else:
                result = df                                       # output all samples
                result.iloc[samples_offset:,-4:] = predict_result.iloc[:, 1:]
            ```;
        samples
        | evaluate python(typeof(*), code, kwargs, external_artifacts=artifacts)
    }
    
  2. Copie/cole a consulta de previsão a seguir.

    1. Substitua o URI do modelo de saída copiado no final da etapa 7.
    2. Execute a consulta. Ele detecta anomalias multivariadas nas cinco ações, com base no modelo treinado e renderiza os resultados como anomalychart. Os pontos anômalos são renderizados na primeira ação (AAPL), embora representem anomalias multivariadas, ou seja, anomalias das mudanças conjuntas das cinco ações na data específica.
    let cutoff_date=datetime(2023-01-01);
    let num_predictions=toscalar(demo_stocks_change | where Date >= cutoff_date | count);   //  number of latest points to predict
    let sliding_window=200;                                                                 //  should match the window that was set for model training
    let prefix_score_len = sliding_window/2+min_of(sliding_window/2, 200)-1;
    let num_samples = prefix_score_len + num_predictions;
    demo_stocks_change
    | top num_samples by Date desc 
    | order by Date asc
    | extend is_anomaly=bool(false), score=real(null), severity=real(null), interpretation=dynamic(null)
    | invoke predict_fabric_mvad_fl(pack_array('AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY'),
                // NOTE: Update artifacts_uri to model path
                artifacts_uri='enter your model URI here',
                trim_result=true)
    | summarize Date=make_list(Date), AAPL=make_list(AAPL), AMZN=make_list(AMZN), GOOG=make_list(GOOG), MSFT=make_list(MSFT), SPY=make_list(SPY), anomaly=make_list(toint(is_anomaly))
    | render anomalychart with(anomalycolumns=anomaly, title='Stock Price Changest in % with Anomalies')
    

O gráfico de anomalia resultante deve ser semelhante à imagem a seguir:

Captura de tela da saída de anomalia multivariada.

Limpar os recursos

Ao concluir o tutorial, você poderá excluir os recursos criados para evitar incorrer em outros custos. Para excluir os recursos, siga estas etapas:

  1. Navegue até a home page do espaço de trabalho.
  2. Exclua o ambiente criado neste tutorial.
  3. Exclua o notebook criado neste tutorial.
  4. Exclua o Eventhouse ou o banco de dados usado neste tutorial.
  5. Exclua o conjunto de consultas KQL criado neste tutorial.