Tutorial 5: Desenvolver um conjunto de recursos com uma fonte personalizada
Um repositório de recursos gerenciados do Azure Machine Learning permite descobrir, criar e operacionalizar recursos. Os recursos servem como tecido conjuntivo no ciclo de vida do aprendizado de máquina, começando pela fase de prototipagem, onde você experimenta vários recursos. Esse ciclo de vida continua até a fase de operacionalização, onde você implanta seus modelos e as etapas de inferência pesquisam os dados do recurso. Para obter mais informações sobre repositórios de recursos, visite o recurso de conceitos de repositório de recursos.
A Parte 1 desta série de tutoriais mostrou como criar uma especificação de conjunto de recursos com transformações personalizadas, habilitar a materialização e executar um backfill. A Parte 2 mostrou como experimentar recursos nos fluxos de experimentação e treinamento. A Parte 3 explicou a materialização recorrente para o transactions
conjunto de recursos e mostrou como executar um pipeline de inferência em lote no modelo registrado. A Parte 4 descreveu como executar a inferência em lote.
Neste tutorial, você irá:
- Defina a lógica para carregar dados de uma fonte de dados personalizada.
- Configure e registre um conjunto de recursos a ser consumido a partir dessa fonte de dados personalizada.
- Teste o conjunto de recursos registrado.
Pré-requisitos
Nota
Este tutorial usa um bloco de anotações do Azure Machine Learning com o Serverless Spark Compute.
- Certifique-se de concluir os tutoriais anteriores desta série. Este tutorial reutiliza o repositório de recursos e outros recursos criados nesses tutoriais anteriores.
Configurar
Este tutorial usa o SDK principal do repositório de recursos Python (azureml-featurestore
). O SDK do Python é usado para criar, ler, atualizar e excluir operações (CRUD) em repositórios de recursos, conjuntos de recursos e entidades de armazenamento de recursos.
Não é necessário instalar explicitamente esses recursos para este tutorial, porque nas instruções de configuração mostradas aqui, o conda.yml
arquivo os abrange.
Configurar o bloco de anotações do Azure Machine Learning Spark
Você pode criar um novo bloco de anotações e executar as instruções neste tutorial, passo a passo. Você também pode abrir e executar o bloco de anotações existente featurestore_sample/notebooks/sdk_only/5.Develop-feature-set-custom-source.ipynb. Mantenha este tutorial aberto e consulte-o para obter links de documentação e mais explicações.
No menu superior, na lista suspensa Computação , selecione Computação sem servidor em Azure Machine Learning Serverless Spark.
Configure a sessão:
- Selecione Configurar sessão na barra de status superior
- Selecione a guia Pacotes Python, selecione Carregar arquivo Conda
- Selecione Carregar arquivo Conda
- Carregue o arquivo de conda.yml que você carregou no primeiro tutorial
- Opcionalmente, aumente o tempo limite da sessão (tempo ocioso) para evitar repetições frequentes de pré-requisitos
Configurar o diretório raiz para os exemplos
Esta célula de código configura o diretório raiz para os exemplos. Ele precisa de cerca de 10 minutos para instalar todas as dependências e iniciar a sessão do Spark.
import os
# Please update the dir to ./Users/{your_user_alias} (or any custom directory you uploaded the samples to).
# You can find the name from the directory structure in the left navigation panel.
root_dir = "./Users/<your_user_alias>/featurestore_sample"
if os.path.isdir(root_dir):
print("The folder exists.")
else:
print("The folder does not exist. Please create or fix the path")
Inicializar o cliente CRUD do espaço de trabalho do repositório de recursos
Inicialize o MLClient
espaço de trabalho do repositório de recursos para cobrir as operações de criação, leitura, atualização e exclusão (CRUD) no espaço de trabalho do repositório de recursos.
from azure.ai.ml import MLClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
# Feature store
featurestore_name = (
"<FEATURESTORE_NAME>" # use the same name that was used in the tutorial #1
)
featurestore_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
featurestore_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"]
# Feature store ml client
fs_client = MLClient(
AzureMLOnBehalfOfCredential(),
featurestore_subscription_id,
featurestore_resource_group_name,
featurestore_name,
)
Inicializar o cliente SDK principal do repositório de recursos
Como mencionado anteriormente, este tutorial usa o SDK principal do repositório de recursos Python (azureml-featurestore
). Este cliente SDK inicializado abrange operações de criação, leitura, atualização e exclusão (CRUD) em repositórios de recursos, conjuntos de recursos e entidades de armazenamento de recursos.
from azureml.featurestore import FeatureStoreClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
featurestore = FeatureStoreClient(
credential=AzureMLOnBehalfOfCredential(),
subscription_id=featurestore_subscription_id,
resource_group_name=featurestore_resource_group_name,
name=featurestore_name,
)
Definição de fonte personalizada
Você pode definir sua própria lógica de carregamento de origem a partir de qualquer armazenamento de dados que tenha uma definição de fonte personalizada. Implemente uma classe UDF (função definida pelo usuário) do processador de origem (CustomSourceTransformer
neste tutorial) para usar esse recurso. Esta classe deve definir uma __init__(self, **kwargs)
função e uma process(self, start_time, end_time, **kwargs)
função. O kwargs
dicionário é fornecido como parte da definição de especificação do conjunto de recursos. Esta definição é então passada para a UDF. Os start_time
parâmetros e end_time
são calculados e passados para a função UDF.
Este é o código de exemplo para a classe UDF do processador de origem:
from datetime import datetime
class CustomSourceTransformer:
def __init__(self, **kwargs):
self.path = kwargs.get("source_path")
self.timestamp_column_name = kwargs.get("timestamp_column_name")
if not self.path:
raise Exception("`source_path` is not provided")
if not self.timestamp_column_name:
raise Exception("`timestamp_column_name` is not provided")
def process(
self, start_time: datetime, end_time: datetime, **kwargs
) -> "pyspark.sql.DataFrame":
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, to_timestamp
spark = SparkSession.builder.getOrCreate()
df = spark.read.json(self.path)
if start_time:
df = df.filter(col(self.timestamp_column_name) >= to_timestamp(lit(start_time)))
if end_time:
df = df.filter(col(self.timestamp_column_name) < to_timestamp(lit(end_time)))
return df
Crie uma especificação de conjunto de recursos com uma fonte personalizada e experimente-a localmente
Agora, crie uma especificação de conjunto de recursos com uma definição de fonte personalizada e use-a em seu ambiente de desenvolvimento para experimentar o conjunto de recursos. O bloco de anotações tutorial anexado ao Serverless Spark Compute serve como o ambiente de desenvolvimento.
from azureml.featurestore import create_feature_set_spec
from azureml.featurestore.feature_source import CustomFeatureSource
from azureml.featurestore.contracts import (
SourceProcessCode,
TransformationCode,
Column,
ColumnType,
DateTimeOffset,
TimestampColumn,
)
transactions_source_process_code_path = (
root_dir
+ "/featurestore/featuresets/transactions_custom_source/source_process_code"
)
transactions_feature_transform_code_path = (
root_dir
+ "/featurestore/featuresets/transactions_custom_source/feature_process_code"
)
udf_featureset_spec = create_feature_set_spec(
source=CustomFeatureSource(
kwargs={
"source_path": "wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source-json/*.json",
"timestamp_column_name": "timestamp",
},
timestamp_column=TimestampColumn(name="timestamp"),
source_delay=DateTimeOffset(days=0, hours=0, minutes=20),
source_process_code=SourceProcessCode(
path=transactions_source_process_code_path,
process_class="source_process.CustomSourceTransformer",
),
),
feature_transformation=TransformationCode(
path=transactions_feature_transform_code_path,
transformer_class="transaction_transform.TransactionFeatureTransformer",
),
index_columns=[Column(name="accountID", type=ColumnType.string)],
source_lookback=DateTimeOffset(days=7, hours=0, minutes=0),
temporal_join_lookback=DateTimeOffset(days=1, hours=0, minutes=0),
infer_schema=True,
)
udf_featureset_spec
Em seguida, defina uma janela de recurso e exiba os valores de recurso nessa janela de recurso.
from datetime import datetime
st = datetime(2023, 1, 1)
et = datetime(2023, 6, 1)
display(
udf_featureset_spec.to_spark_dataframe(
feature_window_start_date_time=st, feature_window_end_date_time=et
)
)
Exportar como uma especificação de conjunto de recursos
Para registrar a especificação do conjunto de recursos no repositório de recursos, primeiro salve essa especificação em um formato específico. Analise a especificação do conjunto de recursos gerado transactions_custom_source
. Abra este ficheiro a partir da árvore de ficheiros para visualizar a especificação: featurestore/featuresets/transactions_custom_source/spec/FeaturesetSpec.yaml
.
A especificação tem os seguintes elementos:
features
: Uma lista de recursos e seus tipos de dados.index_columns
: As chaves de junção necessárias para acessar valores do conjunto de recursos.
Para obter mais informações sobre a especificação, visite Understanding top-level entities in managed feature store and CLI (v2) feature set YAML schema resources.
A persistência da especificação do conjunto de recursos oferece outro benefício: a especificação do conjunto de recursos pode ser controlada pela fonte.
feature_spec_folder = (
root_dir + "/featurestore/featuresets/transactions_custom_source/spec"
)
udf_featureset_spec.dump(feature_spec_folder)
Registrar o conjunto de recursos de transação no repositório de recursos
Use este código para registrar um ativo de conjunto de recursos carregado de fonte personalizada com o repositório de recursos. Em seguida, você pode reutilizar esse ativo e compartilhá-lo facilmente. O registro de um ativo de conjunto de recursos oferece recursos gerenciados, incluindo versionamento e materialização.
from azure.ai.ml.entities import FeatureSet, FeatureSetSpecification
transaction_fset_config = FeatureSet(
name="transactions_custom_source",
version="1",
description="transactions feature set loaded from custom source",
entities=["azureml:account:1"],
stage="Development",
specification=FeatureSetSpecification(path=feature_spec_folder),
tags={"data_type": "nonPII"},
)
poller = fs_client.feature_sets.begin_create_or_update(transaction_fset_config)
print(poller.result())
Obtenha o conjunto de recursos registrado e imprima informações relacionadas.
# Look up the feature set by providing name and version
transactions_fset_config = featurestore.feature_sets.get(
name="transactions_custom_source", version="1"
)
# Print feature set information
print(transactions_fset_config)
Testar a geração de recursos a partir do conjunto de recursos registrado
Use a to_spark_dataframe()
função do conjunto de recursos para testar a geração de recursos do conjunto de recursos registrado e exibir os recursos.
print-txn-fset-sample-values
df = transactions_fset_config.to_spark_dataframe()
display(df)
Você deve ser capaz de buscar com êxito o conjunto de recursos registrados como um dataframe do Spark e, em seguida, exibi-lo. Agora você pode usar esses recursos para uma junção point-in-time com dados de observação e as etapas subsequentes em seu pipeline de aprendizado de máquina.
Limpeza
Se você criou um grupo de recursos para o tutorial, poderá excluir esse grupo de recursos, o que exclui todos os recursos associados a este tutorial. Caso contrário, você pode excluir os recursos individualmente:
- Para excluir o repositório de recursos, abra o grupo de recursos no portal do Azure, selecione o repositório de recursos e exclua-o.
- A identidade gerenciada atribuída pelo usuário (UAI) atribuída ao espaço de trabalho do repositório de recursos não é excluída quando excluímos o repositório de recursos. Para excluir o UAI, siga estas instruções.
- Para excluir um repositório offline do tipo conta de armazenamento, abra o grupo de recursos no portal do Azure, selecione o armazenamento que você criou e exclua-o.
- Para excluir uma instância do Cache do Azure para Redis, abra o grupo de recursos no portal do Azure, selecione a instância que você criou e exclua-a.