Deteção de anomalias multivariável
Para obter informações gerais sobre a deteção de anomalias multivariadas no Real-Time Intelligence, consulte Deteção de anomalias multivariadas no Microsoft Fabric - visão geral. Neste tutorial, você usa dados de exemplo para treinar um modelo de deteção de anomalias multivariado usando o mecanismo Spark em um notebook Python. Em seguida, você prevê anomalias aplicando o modelo treinado a novos dados usando o mecanismo Eventhouse. As primeiras etapas configuram seus ambientes e as etapas seguintes treinam o modelo e preveem anomalias.
Pré-requisitos
- Um espaço de trabalho com uma capacidade habilitada para Microsoft Fabric
- Função de Administrador, Colaborador ou Membro no espaço de trabalho. Esse nível de permissão é necessário para criar itens como um Ambiente.
- Uma casa de eventos no seu espaço de trabalho com uma base de dados.
- Baixe os dados de exemplo do repositório GitHub
- Baixe o bloco de anotações do repositório GitHub
Parte 1- Ativar a disponibilidade do OneLake
A disponibilidade do OneLake deve ser habilitada antes de obter dados na Eventhouse. Esta etapa é importante, pois permite que os dados que você ingere fiquem disponíveis no OneLake. Em uma etapa posterior, você acessa esses mesmos dados do seu Spark Notebook para treinar o modelo.
No seu espaço de trabalho, selecione a Casa de Eventos que você criou nos pré-requisitos. Escolha a base de dados onde pretende armazenar os seus dados.
No painel Detalhes do Banco de Dados, alterne o botão de disponibilidade do OneLake para Ativado.
Parte 2- Ativar o plugin KQL Python
Nesta etapa, você ativa o plugin python em sua Eventhouse. Esta etapa é necessária para executar o código Python de previsão de anomalias no conjunto de consultas KQL. É importante escolher a imagem correta que contém o pacote do detetor de anomalias de séries temporais.
No ecrã Eventhouse, selecione Eventhouse>Plugins na faixa de opções.
No painel Plug-ins, alterne a extensão da linguagem Python paraAtivado.
Selecione Python 3.11.7 DL (visualização).
Selecionar Concluído.
Parte 3- Criar um ambiente Spark
Nesta etapa, você cria um ambiente Spark para executar o notebook Python que treina o modelo de deteção de anomalias multivariadas usando o mecanismo Spark. Para obter mais informações sobre como criar ambientes, consulte Criar e gerenciar ambientes.
No seu espaço de trabalho, selecione + Novo item e depois Ambiente.
Digite o nome MVAD_ENV para o ambiente e, em seguida, selecione Criar.
Na aba Home do ambiente, selecione Runtime>1.2 (Spark 3.4, Delta 2.4).
Em Bibliotecas, selecione Bibliotecas públicas.
Selecione Adicionar do PyPI.
Na caixa de pesquisa, digite time-series-anomaly-detector. A versão é preenchida automaticamente com a versão mais recente. Este tutorial foi criado usando a versão 0.3.2.
Selecione Guardar.
Selecione a guia Página Inicial no ambiente.
Selecione o ícone Publicar na faixa de opções.
Selecione Publicar tudo. Esta etapa pode levar vários minutos para ser concluída.
Parte 4- Obter dados para a Eventhouse
Passe o cursor sobre o banco de dados KQL onde você deseja armazenar seus dados. Selecione o menu Mais [...]>Obter arquivo local de dados>.
Selecione + Nova tabela e digite demo_stocks_change como o nome da tabela.
Na caixa de diálogo Carregar dados, selecione Procurar arquivos e carregue o arquivo de dados de exemplo que foi baixado nos Pré-requisitos
Selecione Seguinte.
Na seção Inspecionar os dados, alterne Primeira linha é o cabeçalho da coluna para Ativado.
Selecione Concluir.
Quando os dados forem carregados, selecione Fechar.
Parte 5- Copiar o caminho do OneLake para a tabela
Certifique-se de selecionar a tabela demo_stocks_change . No painel Detalhes da tabela, selecione a pasta OneLake para copiar o caminho do OneLake para a área de transferência. Salve esse texto copiado em um editor de texto em algum lugar para ser usado em uma etapa posterior.
Parte 6- Preparar o bloco de notas
Selecione a área de trabalho.
Selecione Importar, Bloco de Anotações e, em seguida , Desse computador.
Selecione Carregar e escolha o bloco de anotações que você baixou nos pré-requisitos.
Depois de carregar o bloco de notas, pode localizar e abrir o bloco de notas a partir da sua área de trabalho.
Na faixa de opções superior, selecione a lista suspensa Padrão do espaço de trabalho e selecione o ambiente criado na etapa anterior.
Parte 7- Executar o bloco de notas
Importar pacotes padrão.
import numpy as np import pandas as pd
O Spark precisa de um URI ABFSS para se conectar com segurança ao armazenamento do OneLake, portanto, a próxima etapa define essa função para converter o URI do OneLake em URI ABFSS.
def convert_onelake_to_abfss(onelake_uri): if not onelake_uri.startswith('https://'): raise ValueError("Invalid OneLake URI. It should start with 'https://'.") uri_without_scheme = onelake_uri[8:] parts = uri_without_scheme.split('/') if len(parts) < 3: raise ValueError("Invalid OneLake URI format.") account_name = parts[0].split('.')[0] container_name = parts[1] path = '/'.join(parts[2:]) abfss_uri = f"abfss://{container_name}@{parts[0]}/{path}" return abfss_uri
Substitua o espaço reservado OneLakeTableURI pelo URI do OneLake copiado da Parte 5 - Copiar caminho do OneLake para a tabela para carregar a tabela demo_stocks_change em um dataframe pandas.
onelake_uri = "OneLakeTableURI" # Replace with your OneLake table URI abfss_uri = convert_onelake_to_abfss(onelake_uri) print(abfss_uri)
df = spark.read.format('delta').load(abfss_uri) df = df.toPandas().set_index('Date') print(df.shape) df[:3]
Execute as células a seguir para preparar os dataframes de treinamento e previsão.
Nota
As previsões reais serão executadas em dados pela Eventhouse na parte 9- Predict-anomalies-in-the-kql-queryset. Em um cenário de produção, se você estivesse transmitindo dados para a casa de eventos, as previsões seriam feitas nos novos dados de streaming. Para o propósito do tutorial, o conjunto de dados foi dividido por data em duas seções para treinamento e previsão. Isso é para simular dados históricos e novos dados de streaming.
features_cols = ['AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY'] cutoff_date = pd.to_datetime('2023-01-01')
train_df = df[df.Date < cutoff_date] print(train_df.shape) train_df[:3]
train_len = len(train_df) predict_len = len(df) - train_len print(f'Total samples: {len(df)}. Split to {train_len} for training, {predict_len} for testing')
Execute as células para treinar o modelo e salvá-lo no registro de modelos Fabric MLflow.
import mlflow from anomaly_detector import MultivariateAnomalyDetector model = MultivariateAnomalyDetector()
sliding_window = 200 param s = {"sliding_window": sliding_window}
model.fit(train_df, params=params)
with mlflow.start_run(): mlflow.log_params(params) mlflow.set_tag("Training Info", "MVAD on 5 Stocks Dataset") model_info = mlflow.pyfunc.log_model( python_model=model, artifact_path="mvad_artifacts", registered_model_name="mvad_5_stocks_model", )
Execute a célula a seguir para extrair o caminho do modelo registrado a ser usado para previsão usando a sandbox do Kusto Python.
mi = mlflow.search_registered_models(filter_string="name='mvad_5_stocks_model'")[0] model_abfss = mi.latest_versions[0].source print(model_abfss)
Copie o URI do modelo da última saída de célula para uso em uma etapa posterior.
Parte 8- Configurar o conjunto de consultas KQL
Para obter informações gerais, consulte Criar um conjunto de consultas KQL.
- No seu espaço de trabalho, selecione +Novo item>KQL Queryset.
- Digite o nome MultivariateAnomalyDetectionTutoriale, em seguida, selecione Criar.
- Na janela do hub de dados OneLake, selecione o banco de dados KQL onde você armazenou os dados.
- Selecione Ligar.
Parte 9- Prever anomalias no conjunto de consultas KQL
Execute a seguinte consulta '.create-or-alter function' para definir a função armazenada
predict_fabric_mvad_fl()
:.create-or-alter function with (folder = "Packages\\ML", docstring = "Predict MVAD model in Microsoft Fabric") predict_fabric_mvad_fl(samples:(*), features_cols:dynamic, artifacts_uri:string, trim_result:bool=false) { let s = artifacts_uri; let artifacts = bag_pack('MLmodel', strcat(s, '/MLmodel;impersonate'), 'conda.yaml', strcat(s, '/conda.yaml;impersonate'), 'requirements.txt', strcat(s, '/requirements.txt;impersonate'), 'python_env.yaml', strcat(s, '/python_env.yaml;impersonate'), 'python_model.pkl', strcat(s, '/python_model.pkl;impersonate')); let kwargs = bag_pack('features_cols', features_cols, 'trim_result', trim_result); let code = ```if 1: import os import shutil import mlflow model_dir = 'C:/Temp/mvad_model' model_data_dir = model_dir + '/data' os.mkdir(model_dir) shutil.move('C:/Temp/MLmodel', model_dir) shutil.move('C:/Temp/conda.yaml', model_dir) shutil.move('C:/Temp/requirements.txt', model_dir) shutil.move('C:/Temp/python_env.yaml', model_dir) shutil.move('C:/Temp/python_model.pkl', model_dir) features_cols = kargs["features_cols"] trim_result = kargs["trim_result"] test_data = df[features_cols] model = mlflow.pyfunc.load_model(model_dir) predictions = model.predict(test_data) predict_result = pd.DataFrame(predictions) samples_offset = len(df) - len(predict_result) # this model doesn't output predictions for the first sliding_window-1 samples if trim_result: # trim the prefix samples result = df[samples_offset:] result.iloc[:,-4:] = predict_result.iloc[:, 1:] # no need to copy 1st column which is the timestamp index else: result = df # output all samples result.iloc[samples_offset:,-4:] = predict_result.iloc[:, 1:] ```; samples | evaluate python(typeof(*), code, kwargs, external_artifacts=artifacts) }
Execute a seguinte consulta de previsão, substituindo o URI do modelo de saída pelo URI copiado no final da etapa 7 .
A consulta deteta anomalias multivariadas nos cinco títulos, com base no modelo treinado, e apresenta os resultados como
anomalychart
. Os pontos anômalos são renderizados na primeira unidade populacional (AAPL), embora representem anomalias multivariadas (por outras palavras, anomalias das variações conjuntas das cinco unidades populacionais na data específica).let cutoff_date=datetime(2023-01-01); let num_predictions=toscalar(demo_stocks_change | where Date >= cutoff_date | count); // number of latest points to predict let sliding_window=200; // should match the window that was set for model training let prefix_score_len = sliding_window/2+min_of(sliding_window/2, 200)-1; let num_samples = prefix_score_len + num_predictions; demo_stocks_change | top num_samples by Date desc | order by Date asc | extend is_anomaly=bool(false), score=real(null), severity=real(null), interpretation=dynamic(null) | invoke predict_fabric_mvad_fl(pack_array('AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY'), // NOTE: Update artifacts_uri to model path artifacts_uri='enter your model URI here', trim_result=true) | summarize Date=make_list(Date), AAPL=make_list(AAPL), AMZN=make_list(AMZN), GOOG=make_list(GOOG), MSFT=make_list(MSFT), SPY=make_list(SPY), anomaly=make_list(toint(is_anomaly)) | render anomalychart with(anomalycolumns=anomaly, title='Stock Price Changest in % with Anomalies')
O gráfico de anomalias resultante deve ser semelhante à seguinte imagem:
Clean up resources (Limpar recursos)
Ao terminar o tutorial, você pode excluir os recursos criados para evitar incorrer em outros custos. Para eliminar os recursos, siga estes passos:
- Navegue até a página inicial do seu espaço de trabalho.
- Exclua o ambiente criado neste tutorial.
- Exclua o bloco de anotações criado neste tutorial.
- Exclua a Eventhouse ou o banco de dados usado neste tutorial.
- Exclua o conjunto de consultas KQL criado neste tutorial.