教學課程 5:使用自訂來源開發功能集
Azure Machine Learning 受管理的功能存放區可讓您探索、建立及運作功能。 從原型設計階段開始,功能就充當著機器學習生命週期中的結締組織,您可於此階段中測試各種功能。 該生命週期會延續至運算化階段,您可於此階段中部署模型,並可透過推斷步驟查閱功能資料。 如需功能存放區的詳細資訊,請流覽 功能存放區概念 資源。
本教學課程系列的第 1 部分示範如何使用自訂轉換來建立功能集規格、啟用具體化以及執行回填。 第 2 部分示範如何在實驗和定型流程中測試功能。 第 3 部分說明 transactions
功能集的週期性具體化,並示範如何在已註冊的模型上執行批次推斷管線。 第 4 部分說明如何執行批次推斷。
在本教學課程中,您將會
- 定義從自訂資料來源載入資料的邏輯。
- 設定並註冊要從此自訂資料來源取用的功能集。
- 測試已註冊的功能集。
必要條件
注意
本教學課程使用 Azure Machine Learning 筆記本搭配無伺服器 Spark 計算。
- 請務必完成本系列先前的教學課程。 本教學課程會重複使用在先前教學課程中建立的功能存放區和其他資源。
設定
本教學課程使用 Python 功能存放區核心 SDK (azureml-featurestore
)。 Python SDK 用於在功能存放區、功能集和功能存放區實體上建立、讀取、更新和刪除 (CRUD) 作業。
您無須針對本教學課程明確安裝這些資源,因為在此處顯示的設定指示中,conda.yml
檔案涵蓋了所有相關資源。
設定 Azure Machine Learning Spark 筆記本
您可以建立新的筆記本,並逐步執行本教學課程中的指示。 您也可以開啟並執行現有的筆記本 featurestore_sample/notebooks/sdk_only/5.Develop-feature-set-custom-source.ipynb。 讓本教學課程保持開啟狀態,並參閱本教學課程以取得文件連結和更多說明。
在頂端功能表上的 [計算] 下拉式清單中,選取 [Azure Machine Learning 無伺服器 Spark] 底下的 [無伺服器 Spark 計算]。
設定工作階段:
- 選取頂端狀態列中的 [設定工作階段]
- 選取 [Python 套件] 索引標籤,選取 [上傳 Conda 檔案]
- 選取 [上傳 Conda 檔案]。
- 上傳您在第一個教學課程中上傳conda.yml檔案
- 或者,增加會話逾時(空閒時間)以避免經常重新執行必要條件
設定範例的根目錄
此程式碼資料格設定範例的根目錄。 安裝所有相依性並啟動 Spark 工作階段大約需要 10 分鐘。
import os
# Please update the dir to ./Users/{your_user_alias} (or any custom directory you uploaded the samples to).
# You can find the name from the directory structure in the left navigation panel.
root_dir = "./Users/<your_user_alias>/featurestore_sample"
if os.path.isdir(root_dir):
print("The folder exists.")
else:
print("The folder does not exist. Please create or fix the path")
初始化功能存放區工作區的 CRUD 用戶端
初始化功能存放區工作區的 MLClient
,以涵蓋功能存放區工作區上的建立、讀取、更新和刪除 (CRUD) 作業。
from azure.ai.ml import MLClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
# Feature store
featurestore_name = (
"<FEATURESTORE_NAME>" # use the same name that was used in the tutorial #1
)
featurestore_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
featurestore_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"]
# Feature store ml client
fs_client = MLClient(
AzureMLOnBehalfOfCredential(),
featurestore_subscription_id,
featurestore_resource_group_name,
featurestore_name,
)
初始化功能存放區核心 SDK 用戶端
如先前所述,本教學課程使用 Python 功能存放區核心 SDK (azureml-featurestore
)。 此初始化 SDK 用戶端涵蓋針對功能存放區、功能集和功能存放區實體的建立、讀取、更新和刪除 (CRUD) 作業。
from azureml.featurestore import FeatureStoreClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
featurestore = FeatureStoreClient(
credential=AzureMLOnBehalfOfCredential(),
subscription_id=featurestore_subscription_id,
resource_group_name=featurestore_resource_group_name,
name=featurestore_name,
)
自訂來源定義
您可以從任何具有自訂來源定義的資料儲存體中,定義自己的來源載入邏輯。 若要使用這項功能,請實作來源處理器使用者定義函式 (UDF) 類別 (在本教學課程中為 CustomSourceTransformer
)。 此類別應該定義 __init__(self, **kwargs)
函式和 process(self, start_time, end_time, **kwargs)
函式。 kwargs
字典會作為功能集規格定義的一部分提供。 接著會將此定義傳遞至 UDF。 計算 start_time
和 end_time
參數並將其傳遞至 UDF 函式。
這是來源處理器 UDF 類別的範例程式碼:
from datetime import datetime
class CustomSourceTransformer:
def __init__(self, **kwargs):
self.path = kwargs.get("source_path")
self.timestamp_column_name = kwargs.get("timestamp_column_name")
if not self.path:
raise Exception("`source_path` is not provided")
if not self.timestamp_column_name:
raise Exception("`timestamp_column_name` is not provided")
def process(
self, start_time: datetime, end_time: datetime, **kwargs
) -> "pyspark.sql.DataFrame":
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, to_timestamp
spark = SparkSession.builder.getOrCreate()
df = spark.read.json(self.path)
if start_time:
df = df.filter(col(self.timestamp_column_name) >= to_timestamp(lit(start_time)))
if end_time:
df = df.filter(col(self.timestamp_column_name) < to_timestamp(lit(end_time)))
return df
使用自訂來源建立功能集規格,並在本機進行測試
現在,使用自訂來源定義建立功能集規格,並在開發環境中將其用於測試功能集。 無伺服器 Spark 計算隨附的教學課程筆記本可作為開發環境。
from azureml.featurestore import create_feature_set_spec
from azureml.featurestore.feature_source import CustomFeatureSource
from azureml.featurestore.contracts import (
SourceProcessCode,
TransformationCode,
Column,
ColumnType,
DateTimeOffset,
TimestampColumn,
)
transactions_source_process_code_path = (
root_dir
+ "/featurestore/featuresets/transactions_custom_source/source_process_code"
)
transactions_feature_transform_code_path = (
root_dir
+ "/featurestore/featuresets/transactions_custom_source/feature_process_code"
)
udf_featureset_spec = create_feature_set_spec(
source=CustomFeatureSource(
kwargs={
"source_path": "wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source-json/*.json",
"timestamp_column_name": "timestamp",
},
timestamp_column=TimestampColumn(name="timestamp"),
source_delay=DateTimeOffset(days=0, hours=0, minutes=20),
source_process_code=SourceProcessCode(
path=transactions_source_process_code_path,
process_class="source_process.CustomSourceTransformer",
),
),
feature_transformation=TransformationCode(
path=transactions_feature_transform_code_path,
transformer_class="transaction_transform.TransactionFeatureTransformer",
),
index_columns=[Column(name="accountID", type=ColumnType.string)],
source_lookback=DateTimeOffset(days=7, hours=0, minutes=0),
temporal_join_lookback=DateTimeOffset(days=1, hours=0, minutes=0),
infer_schema=True,
)
udf_featureset_spec
接下來,定義功能視窗,並在此功能視窗中顯示功能值。
from datetime import datetime
st = datetime(2023, 1, 1)
et = datetime(2023, 6, 1)
display(
udf_featureset_spec.to_spark_dataframe(
feature_window_start_date_time=st, feature_window_end_date_time=et
)
)
匯出為功能集規格
若要向功能存放區註冊功能集規格,請先以特定格式儲存該規格。 檢閱產生的 transactions_custom_source
功能集規格。 從檔案樹狀目錄開啟此檔案,以檢視規格: featurestore/featuresets/transactions_custom_source/spec/FeaturesetSpec.yaml
。
規格包含下列元素:
features
:功能及其資料類型的清單。index_columns
:要從功能集存取值所需的聯結索引鍵。
如需規格的詳細資訊,請瀏覽瞭解 受管理的功能存放區 和 CLI (v2) 功能集 YAML 架構資源中的最上層實體。
功能集規格持續性可提供另一個優點:功能集規格可由原始檔控制。
feature_spec_folder = (
root_dir + "/featurestore/featuresets/transactions_custom_source/spec"
)
udf_featureset_spec.dump(feature_spec_folder)
向功能存放區註冊交易功能集
使用此程式碼,向功能存放區註冊從自訂來源載入的功能集資產。 然後,您便可以重複使用該資產,並輕鬆地共用該資產。 註冊功能集資產可提供受控功能,包括版本設定和具體化。
from azure.ai.ml.entities import FeatureSet, FeatureSetSpecification
transaction_fset_config = FeatureSet(
name="transactions_custom_source",
version="1",
description="transactions feature set loaded from custom source",
entities=["azureml:account:1"],
stage="Development",
specification=FeatureSetSpecification(path=feature_spec_folder),
tags={"data_type": "nonPII"},
)
poller = fs_client.feature_sets.begin_create_or_update(transaction_fset_config)
print(poller.result())
取得已註冊的功能集,並列印相關資訊。
# Look up the feature set by providing name and version
transactions_fset_config = featurestore.feature_sets.get(
name="transactions_custom_source", version="1"
)
# Print feature set information
print(transactions_fset_config)
從已註冊的功能集測試功能產生
使用功能集的 to_spark_dataframe()
函式,從已註冊的功能集測試功能產生,並顯示功能。
print-txn-fset-sample-values
df = transactions_fset_config.to_spark_dataframe()
display(df)
您應該能夠成功擷取已註冊的功能集作為 Spark 資料框架,然後加以顯示。 您現在可以將這些功能用於與觀察資料的時間點聯結,以及機器學習管線中的後續步驟。
清理
如果您已針對本教學課程建立資源群組,則可以刪除該資源群組,如此便能刪除與此教學課程相關聯的所有資源。 或者,您可以個別刪除資源:
- 若要刪除功能存放區,請在 Azure 入口網站中開啟資源群組,選取功能存放區並加以刪除。
- 當我們刪除功能存放區時,不會刪除指派至功能存放區工作區的使用者指派受控識別 (UAI)。 若要刪除 UAI,請遵循下列指示。
- 若要刪除儲存體帳戶類型的離線存放區,請在 Azure 入口網站中開啟資源群組,選取您所建立的儲存體,然後加以刪除。
- 若要刪除 Azure Cache for Redis 執行個體,請在 Azure 入口網站中開啟資源群組,選取您所建立的執行個體,然後加以刪除。