Samouczek 5. Tworzenie zestawu funkcji przy użyciu niestandardowego źródła
Usługa Azure Machine Learning magazyn zarządzanych funkcji umożliwia odnajdywanie, tworzenie i operacjonalizacja funkcji. Funkcje służą jako tkanka łącząca w cyklu życia uczenia maszynowego, począwszy od fazy tworzenia prototypów, w której eksperymentujesz z różnymi funkcjami. Ten cykl życia jest kontynuowany w fazie operacjonalizacji, w której wdrażasz modele i kroki wnioskowania wyszukują dane funkcji. Aby uzyskać więcej informacji na temat magazynów funkcji, odwiedź zasób pojęć dotyczących magazynu funkcji.
W części 1 tej serii samouczków pokazano, jak utworzyć specyfikację zestawu funkcji z niestandardowymi przekształceniami, włączyć materializację i wykonać wypełnienie. Część 2 pokazała, jak eksperymentować z funkcjami w przepływach eksperymentowania i trenowania. W części 3 wyjaśniono rekursywną materializację transactions
zestawu funkcji i pokazano, jak uruchomić potok wnioskowania wsadowego w zarejestrowanym modelu. Część 4 opisała sposób uruchamiania wnioskowania wsadowego.
W tym samouczku wykonasz następujące instrukcje:
- Zdefiniuj logikę ładowania danych z niestandardowego źródła danych.
- Skonfiguruj i zarejestruj zestaw funkcji do korzystania z tego niestandardowego źródła danych.
- Przetestuj zarejestrowany zestaw funkcji.
Wymagania wstępne
Uwaga
W tym samouczku jest używany notes usługi Azure Machine Learning z bezserwerowymi obliczeniami platformy Spark.
- Pamiętaj, aby ukończyć poprzednie samouczki z tej serii. W tym samouczku jest używany magazyn funkcji i inne zasoby utworzone we wcześniejszych samouczkach.
Konfiguruj
W tym samouczku jest używany podstawowy zestaw SDK magazynu funkcji języka Python (azureml-featurestore
). Zestaw SDK języka Python służy do tworzenia, odczytu, aktualizowania i usuwania operacji (CRUD) w magazynach funkcji, zestawach funkcji i jednostkach magazynu funkcji.
Nie musisz jawnie instalować tych zasobów na potrzeby tego samouczka, ponieważ w instrukcjach konfiguracji przedstawionych tutaj conda.yml
plik obejmuje te zasoby.
Konfigurowanie notesu platformy Spark usługi Azure Machine Learning
Możesz utworzyć nowy notes i wykonać instrukcje w tym samouczku krok po kroku. Możesz również otworzyć i uruchomić istniejący notes featurestore_sample/notebooks/sdk_only/5.Develop-feature-set-custom-source.ipynb. Pozostaw ten samouczek otwarty i zapoznaj się z nim, aby uzyskać linki do dokumentacji i więcej wyjaśnień.
W górnym menu z listy rozwijanej Obliczenia wybierz pozycję Bezserwerowe obliczenia Spark w obszarze Bezserwerowa platforma Spark w usłudze Azure Machine Learning Serverless Spark.
Skonfiguruj sesję:
- Wybierz pozycję Konfiguruj sesję na górnym pasku stanu
- Wybierz kartę Pakiety języka Python, wybierz pozycję Przekaż plik Conda
- Wybieranie pozycji Przekaż plik Conda
- Przekaż plik conda.yml przekazany w pierwszym samouczku
- Opcjonalnie zwiększ limit czasu sesji (czas bezczynności), aby uniknąć częstych ponownych uruchomień wymagań wstępnych
Konfigurowanie katalogu głównego dla przykładów
Ta komórka kodu konfiguruje katalog główny dla przykładów. Wymaga około 10 minut, aby zainstalować wszystkie zależności i uruchomić sesję platformy Spark.
import os
# Please update the dir to ./Users/{your_user_alias} (or any custom directory you uploaded the samples to).
# You can find the name from the directory structure in the left navigation panel.
root_dir = "./Users/<your_user_alias>/featurestore_sample"
if os.path.isdir(root_dir):
print("The folder exists.")
else:
print("The folder does not exist. Please create or fix the path")
Inicjowanie klienta CRUD obszaru roboczego magazynu funkcji
Zainicjuj MLClient
obszar roboczy magazynu funkcji, aby uwzględnić operacje tworzenia, odczytu, aktualizacji i usuwania (CRUD) w obszarze roboczym magazynu funkcji.
from azure.ai.ml import MLClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
# Feature store
featurestore_name = (
"<FEATURESTORE_NAME>" # use the same name that was used in the tutorial #1
)
featurestore_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
featurestore_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"]
# Feature store ml client
fs_client = MLClient(
AzureMLOnBehalfOfCredential(),
featurestore_subscription_id,
featurestore_resource_group_name,
featurestore_name,
)
Inicjowanie klienta podstawowego zestawu SDK magazynu funkcji
Jak wspomniano wcześniej, w tym samouczku jest używany podstawowy zestaw SDK magazynu funkcji języka Python (azureml-featurestore
). Ten zainicjowany klient zestawu SDK obejmuje operacje tworzenia, odczytu, aktualizacji i usuwania (CRUD) w magazynach funkcji, zestawach funkcji i jednostkach magazynu funkcji.
from azureml.featurestore import FeatureStoreClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
featurestore = FeatureStoreClient(
credential=AzureMLOnBehalfOfCredential(),
subscription_id=featurestore_subscription_id,
resource_group_name=featurestore_resource_group_name,
name=featurestore_name,
)
Niestandardowa definicja źródła
Możesz zdefiniować własną logikę ładowania źródła z dowolnego magazynu danych, który ma niestandardową definicję źródła. Zaimplementuj klasę funkcji zdefiniowanej przez użytkownika (UDF) procesora źródłowego (CustomSourceTransformer
w tym samouczku), aby użyć tej funkcji. Ta klasa powinna definiować __init__(self, **kwargs)
funkcję i process(self, start_time, end_time, **kwargs)
funkcję. Słownik kwargs
jest dostarczany jako część definicji specyfikacji zestawu funkcji. Ta definicja jest następnie przekazywana do funkcji zdefiniowanej przez użytkownika. Parametry start_time
i end_time
są obliczane i przekazywane do funkcji UDF.
Jest to przykładowy kod dla klasy UDF procesora źródłowego:
from datetime import datetime
class CustomSourceTransformer:
def __init__(self, **kwargs):
self.path = kwargs.get("source_path")
self.timestamp_column_name = kwargs.get("timestamp_column_name")
if not self.path:
raise Exception("`source_path` is not provided")
if not self.timestamp_column_name:
raise Exception("`timestamp_column_name` is not provided")
def process(
self, start_time: datetime, end_time: datetime, **kwargs
) -> "pyspark.sql.DataFrame":
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, to_timestamp
spark = SparkSession.builder.getOrCreate()
df = spark.read.json(self.path)
if start_time:
df = df.filter(col(self.timestamp_column_name) >= to_timestamp(lit(start_time)))
if end_time:
df = df.filter(col(self.timestamp_column_name) < to_timestamp(lit(end_time)))
return df
Tworzenie specyfikacji zestawu funkcji z niestandardowym źródłem i eksperymentowanie z nim lokalnie
Teraz utwórz specyfikację zestawu funkcji z niestandardową definicją źródła i użyj jej w środowisku projektowym, aby eksperymentować z zestawem funkcji. Notes samouczka dołączony do bezserwerowej usługi Spark Compute służy jako środowisko programistyczne.
from azureml.featurestore import create_feature_set_spec
from azureml.featurestore.feature_source import CustomFeatureSource
from azureml.featurestore.contracts import (
SourceProcessCode,
TransformationCode,
Column,
ColumnType,
DateTimeOffset,
TimestampColumn,
)
transactions_source_process_code_path = (
root_dir
+ "/featurestore/featuresets/transactions_custom_source/source_process_code"
)
transactions_feature_transform_code_path = (
root_dir
+ "/featurestore/featuresets/transactions_custom_source/feature_process_code"
)
udf_featureset_spec = create_feature_set_spec(
source=CustomFeatureSource(
kwargs={
"source_path": "wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source-json/*.json",
"timestamp_column_name": "timestamp",
},
timestamp_column=TimestampColumn(name="timestamp"),
source_delay=DateTimeOffset(days=0, hours=0, minutes=20),
source_process_code=SourceProcessCode(
path=transactions_source_process_code_path,
process_class="source_process.CustomSourceTransformer",
),
),
feature_transformation=TransformationCode(
path=transactions_feature_transform_code_path,
transformer_class="transaction_transform.TransactionFeatureTransformer",
),
index_columns=[Column(name="accountID", type=ColumnType.string)],
source_lookback=DateTimeOffset(days=7, hours=0, minutes=0),
temporal_join_lookback=DateTimeOffset(days=1, hours=0, minutes=0),
infer_schema=True,
)
udf_featureset_spec
Następnie zdefiniuj okno funkcji i wyświetl wartości funkcji w tym oknie funkcji.
from datetime import datetime
st = datetime(2023, 1, 1)
et = datetime(2023, 6, 1)
display(
udf_featureset_spec.to_spark_dataframe(
feature_window_start_date_time=st, feature_window_end_date_time=et
)
)
Eksportowanie jako specyfikacja zestawu funkcji
Aby zarejestrować specyfikację zestawu funkcji w magazynie funkcji, najpierw zapisz tę specyfikację w określonym formacie. Przejrzyj wygenerowaną transactions_custom_source
specyfikację zestawu funkcji. Otwórz ten plik z drzewa plików, aby wyświetlić specyfikację: featurestore/featuresets/transactions_custom_source/spec/FeaturesetSpec.yaml
.
Specyfikacja zawiera następujące elementy:
features
: lista funkcji i ich typów danych.index_columns
: klucze sprzężenia wymagane do uzyskania dostępu do wartości z zestawu funkcji.
Aby uzyskać więcej informacji na temat specyfikacji, odwiedź stronę Understanding top-level entities in magazyn zarządzanych funkcji and CLI (v2) feature set YAML schema resources (Omówienie jednostek najwyższego poziomu w magazyn zarządzanych funkcji i interfejsie wiersza polecenia (v2) zestawów schematów YAML.
Trwałość specyfikacji zestawu funkcji zapewnia kolejną korzyść: specyfikacja zestawu funkcji może być kontrolowana przez źródło.
feature_spec_folder = (
root_dir + "/featurestore/featuresets/transactions_custom_source/spec"
)
udf_featureset_spec.dump(feature_spec_folder)
Rejestrowanie zestawu funkcji transakcji w magazynie funkcji
Użyj tego kodu, aby zarejestrować zasób zestawu funkcji załadowany ze źródła niestandardowego do magazynu funkcji. Następnie możesz ponownie użyć tego zasobu i łatwo go udostępnić. Rejestracja zasobu zestawu funkcji oferuje funkcje zarządzane, w tym przechowywanie wersji i materializacja.
from azure.ai.ml.entities import FeatureSet, FeatureSetSpecification
transaction_fset_config = FeatureSet(
name="transactions_custom_source",
version="1",
description="transactions feature set loaded from custom source",
entities=["azureml:account:1"],
stage="Development",
specification=FeatureSetSpecification(path=feature_spec_folder),
tags={"data_type": "nonPII"},
)
poller = fs_client.feature_sets.begin_create_or_update(transaction_fset_config)
print(poller.result())
Uzyskaj zarejestrowany zestaw funkcji i wyświetl powiązane informacje.
# Look up the feature set by providing name and version
transactions_fset_config = featurestore.feature_sets.get(
name="transactions_custom_source", version="1"
)
# Print feature set information
print(transactions_fset_config)
Testowanie generowania funkcji z zarejestrowanego zestawu funkcji
to_spark_dataframe()
Użyj funkcji zestawu funkcji, aby przetestować generowanie funkcji z zarejestrowanego zestawu funkcji i wyświetlić funkcje.
print-txn-fset-sample-values
df = transactions_fset_config.to_spark_dataframe()
display(df)
Powinno być możliwe pomyślne pobranie zarejestrowanego zestawu funkcji jako ramki danych platformy Spark, a następnie wyświetlenie go. Te funkcje można teraz używać do łączenia do punktu w czasie z danymi obserwacji oraz kolejnych kroków w potoku uczenia maszynowego.
Czyszczenie
Jeśli utworzono grupę zasobów na potrzeby tego samouczka, możesz usunąć tę grupę zasobów, co spowoduje usunięcie wszystkich zasobów skojarzonych z tym samouczkiem. W przeciwnym razie możesz usunąć zasoby indywidualnie:
- Aby usunąć magazyn funkcji, otwórz grupę zasobów w witrynie Azure Portal, wybierz magazyn funkcji i usuń ją.
- Tożsamość zarządzana przypisana przez użytkownika (UAI) przypisana do obszaru roboczego magazynu funkcji nie jest usuwana po usunięciu magazynu funkcji. Aby usunąć interfejs użytkownika, postępuj zgodnie z tymi instrukcjami.
- Aby usunąć magazyn typu konta magazynu w trybie offline, otwórz grupę zasobów w witrynie Azure Portal, wybierz utworzony magazyn i usuń go.
- Aby usunąć wystąpienie usługi Azure Cache for Redis, otwórz grupę zasobów w witrynie Azure Portal, wybierz utworzone wystąpienie i usuń je.