Compartilhar via


Tutorial 5: Desenvolver um conjunto de recursos com uma fonte personalizada

Um repositório de recursos gerenciados do Azure Machine Learning lhe permite descobrir, criar e operacionalizar recursos. Os recursos servem como o tecido conectivo no ciclo de vida de machine learning, começando com a fase de criação de protótipos, na qual você experimenta vários recursos. Esse ciclo de vida continua na fase de operacionalização, na qual você implanta seus modelos e as etapas de inferência pesquisam os dados do recurso. Para obter mais informações sobre repositórios de recursos, confira conceitos de repositório de recursos.

A Parte 1 desta série de tutoriais mostrou como criar uma especificação de conjunto de recursos com transformações personalizadas, habilitar a materialização e executar um backfill. A parte 2 mostrou como experimentar recursos nos fluxos de experimentação e treinamento. A parte 3 explicou a materialização recorrente para o conjunto de recursos transactions e mostrou como executar um pipeline de inferência em lote no modelo registrado. A Parte 4 descreveu como executar a inferência em lote.

Nesse tutorial, você irá

  • Defina a lógica para carregar dados de uma fonte de dados personalizada.
  • Configure e registre um conjunto de recursos para consumir a partir dessa fonte de dados personalizada.
  • Testar o conjunto de recursos registrado.

Pré-requisitos

Observação

Esse tutorial usa um notebook do Azure Machine Learning com Computação do Spark Sem Servidor.

  • Verifique se você concluiu os tutoriais anteriores desta série. Esse tutorial reutiliza o armazenamento de recursos e outros recursos criados nesses tutoriais anteriores.

Configuração

Esse tutorial usa o SDK básico do repositório de recursos do Python (azureml-featurestore). O SDK do Python é usado para operações CRUD (criar, ler, atualizar e excluir), em repositórios de recursos, conjuntos de recursos e entidades de repositório de recursos.

Você não precisa instalar esses recursos explicitamente para esse tutorial porque nas instruções de configuração mostradas aqui, o arquivo conda.yml os abrange.

Configurar o notebook Spark do Azure Machine Learning

Você pode criar um novo notebook e executar as instruções deste documento, passo a passo. Você também pode abrir e executar o notebook existente featurestore_sample/notebooks/sdk_only/5.Enable-online-store-run-inference.ipynb. Mantenha este tutorial aberto e consulte-o para obter links de documentação e mais explicações.

  1. No menu superior, na lista suspensa Compute, selecione Serverless Spark Compute em Azure Machine Learning Serverless Spark.

  2. Configure a sessão:

    1. Selecione Configurar sessão na barra de status superior.
    2. Selecione a guia Pacotes do Python, s
    3. Selecione Carregar Arquivo Conda
    4. Carregue o arquivo conda.yml que você carregou no primeiro tutorial.
    5. Opcionalmente, aumente o tempo limite da sessão (tempo ocioso) para evitar reexecuções frequentes de pré-requisito.

Configurar o diretório raiz para os exemplos

Essa célula de código configura o diretório raiz para as amostras. São necessários cerca de dez minutos para instalar todas as dependências e iniciar a sessão do Spark.

import os

# Please update the dir to ./Users/{your_user_alias} (or any custom directory you uploaded the samples to).
# You can find the name from the directory structure in the left navigation panel.
root_dir = "./Users/<your_user_alias>/featurestore_sample"

if os.path.isdir(root_dir):
    print("The folder exists.")
else:
    print("The folder does not exist. Please create or fix the path")

Inicializar o cliente CRUD do espaço de trabalho do repositório de recursos

Inicialize o MLClient para o espaço de trabalho do repositório de recursos, para abranger as operações de criação, leitura, atualização e exclusão (CRUD) no espaço de trabalho do repositório de recursos.

from azure.ai.ml import MLClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential

# Feature store
featurestore_name = (
    "<FEATURESTORE_NAME>"  # use the same name that was used in the tutorial #1
)
featurestore_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
featurestore_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"]

# Feature store ml client
fs_client = MLClient(
    AzureMLOnBehalfOfCredential(),
    featurestore_subscription_id,
    featurestore_resource_group_name,
    featurestore_name,
)

Inicializar o cliente principal do SDK do repositório de recursos

Conforme mencionamos antes, esse tutorial usa o SDK básico do repositório de recursos do Python (azureml-featurestore). Esse cliente SDK inicializado abrange operações de criação, leitura, atualização e exclusão (CRUD), em repositórios de recursos, conjuntos de recursos e entidades de repositório de recursos.

from azureml.featurestore import FeatureStoreClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential

featurestore = FeatureStoreClient(
    credential=AzureMLOnBehalfOfCredential(),
    subscription_id=featurestore_subscription_id,
    resource_group_name=featurestore_resource_group_name,
    name=featurestore_name,
)

Definição da fonte personalizada

Você pode definir sua própria lógica de carregamento de fonte a partir de qualquer armazenamento de dados que tenha uma definição de fonte personalizada. Implemente uma classe de função definida pelo usuário (UDF) do processador de fonte (CustomSourceTransformer neste tutorial) para usar esse recurso. Essa classe deve definir uma função __init__(self, **kwargs) e uma função process(self, start_time, end_time, **kwargs). O dicionário kwargs é fornecido como parte da definição de especificação do conjunto de recursos. Em seguida, essa definição é passada para a UDF. Os parâmetros start_time e end_time são calculados e passados para a função UDF.

Este é um exemplo de código para a classe UDF do processador de origem:

from datetime import datetime

class CustomSourceTransformer:
    def __init__(self, **kwargs):
        self.path = kwargs.get("source_path")
        self.timestamp_column_name = kwargs.get("timestamp_column_name")
        if not self.path:
            raise Exception("`source_path` is not provided")
        if not self.timestamp_column_name:
            raise Exception("`timestamp_column_name` is not provided")

    def process(
        self, start_time: datetime, end_time: datetime, **kwargs
    ) -> "pyspark.sql.DataFrame":
        from pyspark.sql import SparkSession
        from pyspark.sql.functions import col, lit, to_timestamp

        spark = SparkSession.builder.getOrCreate()
        df = spark.read.json(self.path)

        if start_time:
            df = df.filter(col(self.timestamp_column_name) >= to_timestamp(lit(start_time)))

        if end_time:
            df = df.filter(col(self.timestamp_column_name) < to_timestamp(lit(end_time)))

        return df

Crie uma especificação de conjunto de recursos com uma fonte personalizada e experimente-a localmente

Agora, crie uma especificação de conjunto de recursos com uma definição de fonte personalizada e use-a em seu ambiente de desenvolvimento para fazer experiências com o conjunto de recursos. O notebook do tutorial anexado à Computação do Spark Sem Servidor funciona como o ambiente de desenvolvimento.

from azureml.featurestore import create_feature_set_spec
from azureml.featurestore.feature_source import CustomFeatureSource
from azureml.featurestore.contracts import (
    SourceProcessCode,
    TransformationCode,
    Column,
    ColumnType,
    DateTimeOffset,
    TimestampColumn,
)

transactions_source_process_code_path = (
    root_dir
    + "/featurestore/featuresets/transactions_custom_source/source_process_code"
)
transactions_feature_transform_code_path = (
    root_dir
    + "/featurestore/featuresets/transactions_custom_source/feature_process_code"
)

udf_featureset_spec = create_feature_set_spec(
    source=CustomFeatureSource(
        kwargs={
            "source_path": "wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source-json/*.json",
            "timestamp_column_name": "timestamp",
        },
        timestamp_column=TimestampColumn(name="timestamp"),
        source_delay=DateTimeOffset(days=0, hours=0, minutes=20),
        source_process_code=SourceProcessCode(
            path=transactions_source_process_code_path,
            process_class="source_process.CustomSourceTransformer",
        ),
    ),
    feature_transformation=TransformationCode(
        path=transactions_feature_transform_code_path,
        transformer_class="transaction_transform.TransactionFeatureTransformer",
    ),
    index_columns=[Column(name="accountID", type=ColumnType.string)],
    source_lookback=DateTimeOffset(days=7, hours=0, minutes=0),
    temporal_join_lookback=DateTimeOffset(days=1, hours=0, minutes=0),
    infer_schema=True,
)

udf_featureset_spec

Em seguida, defina uma janela de recursos e exiba os valores dos recursos nessa janela.

from datetime import datetime

st = datetime(2023, 1, 1)
et = datetime(2023, 6, 1)

display(
    udf_featureset_spec.to_spark_dataframe(
        feature_window_start_date_time=st, feature_window_end_date_time=et
    )
)

Exportar como uma especificação de conjunto de recursos

Para registrar a especificação do conjunto de recursos no repositório de recursos, primeiro salve essa especificação em um formato específico. Examine a especificação do conjunto de recursos de transactions_custom_source gerado. Abra esse arquivo na árvore de arquivos para ver a especificação: featurestore/featuresets/transactions_custom_source/spec/FeaturesetSpec.yaml.

A especificação tem os seguintes elementos:

  • features: uma lista de recursos e seus tipos de dados.
  • index_columns: as chaves de junção necessárias para acessar os valores do conjunto de recursos.

Para saber mais sobre a especificação, confira Noções básicas sobre entidades de nível superior no repositório de recursos gerenciados e o esquema YAML do conjunto de recursos da CLI (v2).

A persistência da especificação do conjunto de recursos oferece outro benefício: a especificação do conjunto de recursos pode ser controlada pela fonte.

feature_spec_folder = (
    root_dir + "/featurestore/featuresets/transactions_custom_source/spec"
)

udf_featureset_spec.dump(feature_spec_folder)

Registrar o conjunto de recursos de transação com o repositório de recursos

Use esse código para registrar um ativo de conjunto de recursos carregado de uma fonte personalizada com o repositório de recursos. Em seguida, você pode reutilizar esse ativo e compartilhá-lo facilmente. O registro de ativos do conjunto de recursos oferece recursos gerenciados, incluindo controle de versão e materialização.

from azure.ai.ml.entities import FeatureSet, FeatureSetSpecification

transaction_fset_config = FeatureSet(
    name="transactions_custom_source",
    version="1",
    description="transactions feature set loaded from custom source",
    entities=["azureml:account:1"],
    stage="Development",
    specification=FeatureSetSpecification(path=feature_spec_folder),
    tags={"data_type": "nonPII"},
)

poller = fs_client.feature_sets.begin_create_or_update(transaction_fset_config)
print(poller.result())

Obtenha o conjunto de recursos registrado e imprima as informações relacionadas.

# Look up the feature set by providing name and version
transactions_fset_config = featurestore.feature_sets.get(
    name="transactions_custom_source", version="1"
)
# Print feature set information
print(transactions_fset_config)

Teste a geração de recursos a partir do conjunto de recursos registrado

Use a função to_spark_dataframe() do conjunto de recursos para testar a geração de recursos a partir do conjunto de recursos registrado e exibir os recursos. print-txn-fset-sample-values

df = transactions_fset_config.to_spark_dataframe()
display(df)

Você deve ser capaz de buscar com sucesso o conjunto de recursos registrado como um dataframe do Spark e, em seguida, exibi-lo. Agora você pode usar esses recursos para uma junção pontual com dados de observação e as etapas subsequentes no pipeline de aprendizado de máquina.

Limpar

Se você criou um grupo de recursos para o tutorial, poderá excluir esse grupo de recursos, o que excluirá todos os recursos associados a este tutorial. Caso contrário, você pode excluir os recursos individualmente:

  • Para excluir o repositório de recursos: abra o grupo de recursos no portal do Azure, selecione o repositório de recursos e exclua-o.
  • A identidade gerenciada atribuída pelo usuário (UAI) atribuída ao espaço de trabalho do repositório de recursos não é excluída quando excluímos o repositório de recursos. Para excluir a UAI, siga estas instruções.
  • Para excluir um repositório offline do tipo conta de armazenamento, abra o grupo de recursos no portal do Azure, selecione o armazenamento que você criou e exclua-o.
  • Para excluir uma instância do Cache do Azure para Redis, abra o grupo de recursos no portal do Azure, selecione a instância que você criou e exclua-a.

Próximas etapas