教學課程 3:啟用週期性具體化並執行批次推斷
本教學課程系列會示範功能如何順暢地整合機器學習生命週期的所有階段:原型設計、定型和運算化。
第一個教學課程示範如何使用自定義轉換來建立功能集規格。 然後,它示範如何使用該功能集來產生定型數據、啟用具體化,以及執行回填。 第二個教學課程示範如何啟用具體化並執行回填。 其也示範了如何實驗功能,以此來提升模型效能。
本教學課程說明如何:
- 啟用
transactions
功能集的週期性具體化。 - 在已註冊的模型上執行批次推斷管線。
必要條件
在繼續進行本教學課程之前,請務必先完成本系列的第一個和第二個教學課程。
設定
設定 Azure Machine Learning Spark 筆記本。
若要執行本教學課程,您可以建立新的筆記本,並逐步執行指示。 您也可以開啟並執行以下名稱的現有筆記本:3.啟用週期性具體化並執行批次推斷。 您可以在 featurestore_sample/notebooks 目錄中找到該筆記本,以及此系列中的所有筆記本。 您可以選擇 sdk_only 或 sdk_and_cli。 讓本教學課程保持開啟狀態,並參閱本教學課程以取得文件連結和更多說明。
在頂端導覽中的 [計算] 下拉式清單中,選取 [Azure Machine Learning 無伺服器 Spark] 底下的 [無伺服器 Spark 計算]。
設定工作階段:
- 選取頂端狀態列中的 [設定工作階段]。
- 選取 [Python 套件] 索引標籤。
- 選取 [上傳 Conda 檔案]。
- 從本機電腦中選取
azureml-examples/sdk/python/featurestore-sample/project/env/online.yml
檔案。 - (選擇性) 增加工作階段逾時 (閑置時間),以避免經常重新執行必要條件。
啟動 Spark 工作階段。
# run this cell to start the spark session (any code block will start the session ). This can take around 10 mins. print("start spark session")
設定範例的根目錄。
import os # please update the dir to ./Users/<your_user_alias> (or any custom directory you uploaded the samples to). # You can find the name from the directory structure in the left nav root_dir = "./Users/<your_user_alias>/featurestore_sample" if os.path.isdir(root_dir): print("The folder exists.") else: print("The folder does not exist. Please create or fix the path")
設定 CLI。
不適用。
初始化專案工作區 CRUD (建立、讀取、更新和刪除) 用戶端。
本教學課程筆記本會從這個目前的工作區執行。
### Initialize the MLClient of this project workspace import os from azure.ai.ml import MLClient from azure.ai.ml.identity import AzureMLOnBehalfOfCredential project_ws_sub_id = os.environ["AZUREML_ARM_SUBSCRIPTION"] project_ws_rg = os.environ["AZUREML_ARM_RESOURCEGROUP"] project_ws_name = os.environ["AZUREML_ARM_WORKSPACE_NAME"] # connect to the project workspace ws_client = MLClient( AzureMLOnBehalfOfCredential(), project_ws_sub_id, project_ws_rg, project_ws_name )
初始化功能存放區變數。
若要反映您在第一個教學課程中建立的內容,請務必更新
featurestore_name
值。from azure.ai.ml import MLClient from azure.ai.ml.identity import AzureMLOnBehalfOfCredential # feature store featurestore_name = ( "<FEATURESTORE_NAME>" # use the same name from part #1 of the tutorial ) featurestore_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"] featurestore_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"] # feature store ml client fs_client = MLClient( AzureMLOnBehalfOfCredential(), featurestore_subscription_id, featurestore_resource_group_name, featurestore_name, )
初始化功能存放區 SDK 用戶端。
# feature store client from azureml.featurestore import FeatureStoreClient from azure.ai.ml.identity import AzureMLOnBehalfOfCredential featurestore = FeatureStoreClient( credential=AzureMLOnBehalfOfCredential(), subscription_id=featurestore_subscription_id, resource_group_name=featurestore_resource_group_name, name=featurestore_name, )
在交易功能集上啟用週期性具體化
在第二個教學課程中,您已啟用具體化,並已在 transactions
功能集上執行回填。 回填是隨選的一次性作業,可計算功能值並將其放入具體化存放區中。
若要處理生產環境中的模型推斷,建議您設定週期性具體化作業,讓具體化存放區保持在最新狀態。 這些作業會在使用者定義的排程上執行。 週期性作業排程的運作方式如下:
間隔和頻率值會定義時段。 例如,下列值會定義一個三小時的時段:
interval
=3
frequency
=Hour
第一個時段會從
RecurrenceTrigger
中定義的start_time
值開始,依此類推。在更新時間過後,便會在下一個時段開始時提交第一個週期性作業。
之後的週期性作業則會在第一個作業之後的每個時段提交。
如先前的教學課程所述,在將資料具體化 (回填或週期性具體化) 後,功能擷取預設會使用已具體化的資料。
from datetime import datetime
from azure.ai.ml.entities import RecurrenceTrigger
transactions_fset_config = fs_client.feature_sets.get(name="transactions", version="1")
# create a schedule that runs the materialization job every 3 hours
transactions_fset_config.materialization_settings.schedule = RecurrenceTrigger(
interval=3, frequency="Hour", start_time=datetime(2023, 4, 15, 0, 4, 10, 0)
)
fs_poller = fs_client.feature_sets.begin_create_or_update(transactions_fset_config)
print(fs_poller.result())
(選擇性) 儲存功能集資產的 YAML 檔案
請使用更新後的設定來儲存 YAML 檔案。
## uncomment and run
# transactions_fset_config.dump(root_dir + "/featurestore/featuresets/transactions/featureset_asset_offline_enabled_with_schedule.yaml")
執行批次推斷管線
批次推斷具有下列步驟:
您使用相同的內建功能擷取元件來進行您在定型管線中所使用的功能擷取 (涵蓋在第三個教學課程中)。 針對管線定型,您已提供功能擷取規格作為元件輸入。 針對批次推斷,您傳遞已註冊的模型以作為輸入。 元件在模型成品中尋找功能擷取規格。
此外,針對定型,觀察資料具有目標變數。 不過,批次推斷觀察資料沒有目標變數。 功能擷取步驟會聯結觀察資料與功能,並輸出批次推斷的資料。
管線使用上一個步驟中的批次推斷輸入資料、在模型上執行推斷,並將預測值附加為輸出。
注意
在此範例中,您使用作業進行批次推斷。 您也可以在 Azure Machine Learning 中使用批次端點。
from azure.ai.ml import load_job # will be used later # set the batch inference pipeline path batch_inference_pipeline_path = ( root_dir + "/project/fraud_model/pipelines/batch_inference_pipeline.yaml" ) batch_inference_pipeline_definition = load_job(source=batch_inference_pipeline_path) # run the training pipeline batch_inference_pipeline_job = ws_client.jobs.create_or_update( batch_inference_pipeline_definition ) # stream the run logs ws_client.jobs.stream(batch_inference_pipeline_job.name)
檢查批次推斷的輸出資料
在管線檢視中:
在
outputs
卡片中選取inference_step
。複製
Data
欄位值。 其看起來像azureml_995abbc2-3171-461e-8214-c3c5d17ede83_output_data_data_with_prediction:1
。在下列資料格中貼上
Data
欄位值,並使用不同的名稱和版本值。 最後一個字元是版本,前面會加上冒號 (:
)。請注意批次推斷管線產生的
predict_is_fraud
資料行。在批次推斷管線 (/project/fraud_mode/pipelines/batch_inference_pipeline.yaml) 輸出中,系統會建立一個未追蹤的數據資產,並以 GUID 作為名稱值和
1
版本值。 這是因為您未提供name
或version
的值outputs
inference_step
。 在此資料格中,您會先衍生再顯示資產的資料路徑。inf_data_output = ws_client.data.get( name="azureml_1c106662-aa5e-4354-b5f9-57c1b0fdb3a7_output_data_data_with_prediction", version="1", ) inf_output_df = spark.read.parquet(inf_data_output.path + "data/*.parquet") display(inf_output_df.head(5))
清理
本系列的第五個教學課程會說明如何刪除資源。
下一步
- 了解功能存放區概念和受管理的功能存放區中的最上層實體。
- 了解受管理的功能存放區的身分識別與存取控制。
- 檢視受管理功能存放區的疑難解答指南。
- 檢視 YAML 參考。