Tutoriel 5 : développer un ensemble de fonctionnalités avec une source personnalisée
Le magasin de fonctionnalités géré d’Azure Machine Learning vous permet de découvrir, créer et opérationnaliser des fonctionnalités. Les fonctionnalités sont le tissu conjonctif du cycle de vie du Machine Learning, à commencer par la phase de prototypage, où vous expérimentez diverses fonctionnalités. Ce cycle de vie continue par la phase d’opérationnalisation, où vous déployez vos modèles, et les étapes d’inférence qui recherchent les données des fonctionnalités. Pour plus d’informations sur les magasins de fonctionnalités, consultez le document sur les concepts du magasin de fonctionnalités.
La partie 1 de cette série de tutoriels décrit comment créer une spécification d’ensemble de fonctionnalités avec des transformations personnalisées, activer la matérialisation et effectuer un renvoi. La partie 2 explique comment effectuer des expériences avec des fonctionnalités dans les flux d’expérimentation et d’entraînement. La partie 3 explique la matérialisation récurrente pour l’ensemble de fonctionnalités transactions
et montre comment exécuter un pipeline d’inférence par lot sur le modèle inscrit. La partie 4 a décrit comment exécuter l’inférence par lots.
Dans ce tutoriel, vous allez
- Définissez la logique pour charger des données à partir d’une source de données personnalisée.
- Configurez et inscrivez un ensemble de fonctionnalités à utiliser à partir de cette source de données personnalisée.
- Testez l’ensemble de fonctionnalités inscrit.
Prérequis
Remarque
Ce tutoriel utilise un notebook Azure Machine Learning avec le calcul Spark serverless.
- Assurez-vous d’avoir terminé les tutoriels précédents de cette série. Ce tutoriel réutilise le magasin de fonctionnalités et d’autres ressources créées dans les tutoriels précédents.
Configuration
Ce tutoriel utilise le SDK de base du magasin de fonctionnalités Python (azureml-featurestore
). Le SDK Python est utilisée pour les opérations de création, de lecture, de mise à jour et de suppression (CRUD) sur des magasins de fonctionnalités, des ensembles de fonctionnalités et des entités de magasin de fonctionnalités.
Vous n’avez pas besoin d’installer explicitement ces ressources pour ce tutoriel, car dans les instructions de configuration indiquées ici, le fichier conda.yml
s’en charge.
Configurer le notebook Spark Azure Machine Learning
Vous pouvez créer un nouveau notebook et exécuter étape par étape les instructions contenues dans ce tutoriel. Vous pouvez également ouvrir et exécuter le notebook existant featurestore_sample/notebooks/sdk_only/5.Develop-feature-set-custom-source.ipynb. Laissez ce tutoriel ouvert pour y trouver les références des liens de documentation et des explications supplémentaires.
Dans le menu supérieur, dans la liste déroulante Calcul, sélectionnez Calcul Spark serverless sous Azure Machine Learning Serverless Spark.
Configurer la session :
- Sélectionnez Configurer la session dans la barre d’état en haut.
- Sélectionnez l’onglet Packages Python.
- Sélectionnez Charger le fichier Conda.
- Chargez le fichier conda.yml que vous avez chargé dans le premier tutoriel.
- Vous pouvez également augmenter le délai d’expiration de session (temps d’inactivité) pour éviter d’avoir à réexécuter fréquemment les opérations prérequises.
Configurer le répertoire racine pour les exemples
Cette cellule de code configure le répertoire racine des exemples. Il faut environ 10 minutes pour installer toutes les dépendances et démarrer la session Spark.
import os
# Please update the dir to ./Users/{your_user_alias} (or any custom directory you uploaded the samples to).
# You can find the name from the directory structure in the left navigation panel.
root_dir = "./Users/<your_user_alias>/featurestore_sample"
if os.path.isdir(root_dir):
print("The folder exists.")
else:
print("The folder does not exist. Please create or fix the path")
Initialisez le client CRUD de l’espace de travail du magasin de fonctionnalités
Initialisez le MLClient
pour l’espace de travail du magasin de fonctionnalités, afin de couvrir les opérations de création, de lecture, de mise à jour et de suppression (CRUD) sur l’espace de travail du magasin de fonctionnalités.
from azure.ai.ml import MLClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
# Feature store
featurestore_name = (
"<FEATURESTORE_NAME>" # use the same name that was used in the tutorial #1
)
featurestore_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
featurestore_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"]
# Feature store ml client
fs_client = MLClient(
AzureMLOnBehalfOfCredential(),
featurestore_subscription_id,
featurestore_resource_group_name,
featurestore_name,
)
Initialisez le client Kit de développement logiciel (SDK) du magasin de fonctionnalités
Comme mentionné précédemment, ce tutoriel utilise le SDK de base du magasin de fonctionnalités Python (azureml-featurestore
). Ce client SDK initialisé est utilisé couvre les opérations de création, de lecture, de mise à jour et de suppression (CRUD) sur des magasins de fonctionnalités, des ensembles de fonctionnalités et des entités de magasins de fonctionnalités.
from azureml.featurestore import FeatureStoreClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
featurestore = FeatureStoreClient(
credential=AzureMLOnBehalfOfCredential(),
subscription_id=featurestore_subscription_id,
resource_group_name=featurestore_resource_group_name,
name=featurestore_name,
)
Définition de sources personnalisées
Vous pouvez définir votre propre logique de chargement de sources à partir de n’importe quel stockage de données qui a une définition de sources personnalisées. Implémentez une classe de fonction définie par l’utilisateur (UDF) du processeur de sources (CustomSourceTransformer
dans ce tutoriel) afin d’utiliser cette fonctionnalité. Cette classe doit définir une fonction __init__(self, **kwargs)
et une fonction process(self, start_time, end_time, **kwargs)
. Le dictionnaire kwargs
est fourni dans le cadre de la définition de spécification de l’ensemble de fonctionnalités. Cette définition est ensuite transmise à la fonction UDF. Les paramètres start_time
et end_time
sont calculés et transmis à la fonction UDF.
Il s’agit d’un exemple de code pour la classe UDF du processeur source :
from datetime import datetime
class CustomSourceTransformer:
def __init__(self, **kwargs):
self.path = kwargs.get("source_path")
self.timestamp_column_name = kwargs.get("timestamp_column_name")
if not self.path:
raise Exception("`source_path` is not provided")
if not self.timestamp_column_name:
raise Exception("`timestamp_column_name` is not provided")
def process(
self, start_time: datetime, end_time: datetime, **kwargs
) -> "pyspark.sql.DataFrame":
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, to_timestamp
spark = SparkSession.builder.getOrCreate()
df = spark.read.json(self.path)
if start_time:
df = df.filter(col(self.timestamp_column_name) >= to_timestamp(lit(start_time)))
if end_time:
df = df.filter(col(self.timestamp_column_name) < to_timestamp(lit(end_time)))
return df
Créer une spécification d’ensemble de fonctionnalités avec une source personnalisée et l’expérimenter localement
À présent, créez une spécification d’ensemble de fonctionnalités avec une définition de sources personnalisées et utilisez-la dans votre environnement de développement pour expérimenter l’ensemble de fonctionnalités. Le notebook du tutoriel attaché au calcul Spark serverless sert d’environnement de développement.
from azureml.featurestore import create_feature_set_spec
from azureml.featurestore.feature_source import CustomFeatureSource
from azureml.featurestore.contracts import (
SourceProcessCode,
TransformationCode,
Column,
ColumnType,
DateTimeOffset,
TimestampColumn,
)
transactions_source_process_code_path = (
root_dir
+ "/featurestore/featuresets/transactions_custom_source/source_process_code"
)
transactions_feature_transform_code_path = (
root_dir
+ "/featurestore/featuresets/transactions_custom_source/feature_process_code"
)
udf_featureset_spec = create_feature_set_spec(
source=CustomFeatureSource(
kwargs={
"source_path": "wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source-json/*.json",
"timestamp_column_name": "timestamp",
},
timestamp_column=TimestampColumn(name="timestamp"),
source_delay=DateTimeOffset(days=0, hours=0, minutes=20),
source_process_code=SourceProcessCode(
path=transactions_source_process_code_path,
process_class="source_process.CustomSourceTransformer",
),
),
feature_transformation=TransformationCode(
path=transactions_feature_transform_code_path,
transformer_class="transaction_transform.TransactionFeatureTransformer",
),
index_columns=[Column(name="accountID", type=ColumnType.string)],
source_lookback=DateTimeOffset(days=7, hours=0, minutes=0),
temporal_join_lookback=DateTimeOffset(days=1, hours=0, minutes=0),
infer_schema=True,
)
udf_featureset_spec
Ensuite, définissez une fenêtre de fonctionnalités et affichez-y les valeurs des fonctionnalités.
from datetime import datetime
st = datetime(2023, 1, 1)
et = datetime(2023, 6, 1)
display(
udf_featureset_spec.to_spark_dataframe(
feature_window_start_date_time=st, feature_window_end_date_time=et
)
)
Exporter en tant que spécification d’ensemble de fonctionnalités
Pour inscrire la spécification de l’ensemble de fonctionnalités auprès du magasin de fonctionnalités, enregistrez d’abord cette spécification dans un format spécifique. Passez en revue la spécification de l’ensemble de fonctionnalités transactions_custom_source
générée. Ouvrez ce fichier à partir de l’arborescence de fichiers pour voir la spécification : featurestore/featuresets/transactions_custom_source/spec/FeaturesetSpec.yaml
.
La spécification comporte les éléments suivants :
features
: une liste de fonctionnalités et de leur type de données.index_columns
: les clés de jointure nécessaires pour accéder aux valeurs à partir de l’ensemble de fonctionnalités.
Pour plus d’informations sur la spécification, consultez Compréhension des entités de niveau supérieur dans un magasin de fonctionnalités géré et Schéma YAML de l’ensemble de fonctionnalités CLI (v2).
La persistance de la spécification de l’ensemble de fonctionnalités offre un autre avantage : la spécification de l’ensemble de fonctionnalités peut être contrôlée par la source.
feature_spec_folder = (
root_dir + "/featurestore/featuresets/transactions_custom_source/spec"
)
udf_featureset_spec.dump(feature_spec_folder)
Inscrire l’ensemble de fonctionnalités des transactions auprès du magasin de fonctionnalités
Utilisez ce code pour inscrire une ressource d’ensemble de fonctionnalités chargée depuis une source personnalisée auprès du magasin de fonctionnalités. Vous pouvez ensuite réutiliser cette ressource et la partager facilement. L’inscription d’une ressource d’ensemble de fonctionnalités offre des capacités managées, notamment le contrôle de version et la matérialisation.
from azure.ai.ml.entities import FeatureSet, FeatureSetSpecification
transaction_fset_config = FeatureSet(
name="transactions_custom_source",
version="1",
description="transactions feature set loaded from custom source",
entities=["azureml:account:1"],
stage="Development",
specification=FeatureSetSpecification(path=feature_spec_folder),
tags={"data_type": "nonPII"},
)
poller = fs_client.feature_sets.begin_create_or_update(transaction_fset_config)
print(poller.result())
Obtenez l’ensemble de fonctionnalités inscrit et imprimez les informations associées.
# Look up the feature set by providing name and version
transactions_fset_config = featurestore.feature_sets.get(
name="transactions_custom_source", version="1"
)
# Print feature set information
print(transactions_fset_config)
Tester la génération de fonctionnalités à partir d’un ensemble de fonctionnalités inscrit
Utilisez la fonction to_spark_dataframe()
de l’ensemble de fonctionnalités pour tester la génération de fonctionnalités à partir de l’ensemble de fonctionnalités inscrit et afficher les fonctionnalités.
print-txn-fset-sample-values
df = transactions_fset_config.to_spark_dataframe()
display(df)
Vous devez être en mesure d’extraire avec succès l’ensemble de fonctionnalités inscrit en tant que DataFrame Spark, puis de l’afficher. Vous pouvez désormais utiliser ces fonctionnalités pour une jointure à un instant dans le passé avec des données d’observation et les étapes suivantes dans votre pipeline Machine Learning.
Nettoyage
Si vous avez créé un groupe de ressources pour le tutoriel, vous pouvez supprimer ce groupe de ressources, ce qui supprime toutes les ressources associées à ce tutoriel. Sinon, vous pouvez supprimer les ressources individuellement :
- Pour supprimer le magasin de fonctionnalités, ouvrez le groupe de ressources dans le Portail Azure et sélectionnez le magasin de fonctionnalités, puis supprimez-le.
- La suppression du magasin de fonctionnalités ne supprime pas l’identité managée affectée par l’utilisateur (UAI) attribuée à l’espace de travail du magasin de fonctionnalités. Pour supprimer l’UAI, suivez cesinstructions.
- Pour supprimer un magasin hors connexion de type compte de stockage, ouvrez le groupe de ressources sur le Portail Azure, sélectionnez le stockage que vous avez créé, puis supprimez-le.
- Pour supprimer une instance Azure Cache pour Redis, ouvrez le groupe de ressources sur le Portail Azure, sélectionnez l’instance que vous avez créée, puis supprimez-la.