자습서 5: 사용자 지정 원본을 사용하여 기능 집합 개발
Azure Machine Learning 관리 기능 저장소를 사용하면 기능을 검색, 만들기 및 운용할 수 있습니다. 기능은 다양한 기능을 실험하는 프로토타입 단계부터 시작하여 기계 학습 수명 주기에서 결합 조직 역할을 합니다. 해당 수명 주기는 모델을 배포하고 추론 단계에서 기능 데이터를 찾는 운영화 단계까지 계속됩니다. 기능 저장소에 대한 자세한 내용은 기능 저장소 개념 리소스를 참조하세요 .
이 자습서 시리즈의 1부에서는 사용자 지정 변환을 사용하여 기능 집합 사양을 만들고 구체화를 사용하도록 설정하고 백필을 수행하는 방법을 보여 주었습니다. 2부에서는 실험 및 학습 흐름의 기능을 실험하는 방법을 보여 주었습니다. 3부에서는 transactions
기능 집합에 대한 되풀이 구체화를 설명하고 등록된 모델에서 일괄 처리 유추 파이프라인을 실행하는 방법을 보여 주었습니다. 4부에서는 일괄 처리 유추를 실행하는 방법을 설명했습니다.
이 자습서에서는 다음을 수행합니다.
- 사용자 지정 데이터 원본에서 데이터를 로드하는 논리를 정의합니다.
- 이 사용자 지정 데이터 원본에서 사용할 기능 집합을 구성하고 등록합니다.
- 등록된 기능 집합을 테스트합니다.
필수 조건
참고 항목
이 자습서에서는 서버리스 Spark Compute와 함께 Azure Machine Learning Notebook을 사용합니다.
- 이 시리즈의 이전 자습서를 완료해야 합니다. 이 자습서에서는 이전 자습서에서 만든 기능 저장소 및 기타 리소스를 다시 사용합니다.
설정
이 자습서에서는 Python 기능 저장소 Core SDK(azureml-featurestore
)를 사용합니다. Python SDK는 기능 저장소, 기능 집합 및 기능 저장소 엔터티에 대한 CRUD(만들기, 읽기, 업데이트 및 삭제) 작업에 사용됩니다.
이 자습서에서는 이러한 리소스를 명시적으로 설치할 필요가 없습니다. 여기에 표시된 설정 지침에서 conda.yml
파일이 이를 다루기 때문입니다.
Azure Machine Learning Spark Notebook 구성
새 Notebook을 만들고 이 자습서의 지침을 단계별로 실행할 수 있습니다. 기존 Notebook featurestore_sample/notebooks/sdk_only/5.Develop-feature-set-custom-source.ipynb를 열고 실행할 수도 있습니다. 이 자습서를 열어두고 설명서 링크와 추가 설명을 참조하세요.
상단 메뉴의 컴퓨팅 드롭다운 목록에서 Azure Machine Learning Serverless Spark 아래의 서버리스 Spark 컴퓨팅을 선택합니다.
세션 구성:
- 상단 상태 표시줄에서 세션 구성을 선택합니다.
- Python 패키지 탭을 선택하고 Conda 파일 업로드를 선택합니다.
- Conda 파일 업로드를 선택합니다.
- 첫 번째 자습서에서 업로드한 conda.yml 파일 업로드
- 필요에 따라 자주 필요한 재실행을 방지하기 위해 세션 시간 제한(유휴 시간)을 늘림
샘플의 루트 디렉터리 설정
이 코드 셀은 샘플의 루트 디렉터리를 설정합니다. 모든 종속성을 설치하고 Spark 세션을 시작하는 데 약 10분이 소요됩니다.
import os
# Please update the dir to ./Users/{your_user_alias} (or any custom directory you uploaded the samples to).
# You can find the name from the directory structure in the left navigation panel.
root_dir = "./Users/<your_user_alias>/featurestore_sample"
if os.path.isdir(root_dir):
print("The folder exists.")
else:
print("The folder does not exist. Please create or fix the path")
기능 저장소 작업 영역의 CRUD 클라이언트 초기화
기능 저장소 작업 영역의 CRUD(만들기, 읽기, 업데이트 및 삭제) 작업을 처리하기 위해 기능 저장소 작업 영역에 대한 MLClient
를 초기화합니다.
from azure.ai.ml import MLClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
# Feature store
featurestore_name = (
"<FEATURESTORE_NAME>" # use the same name that was used in the tutorial #1
)
featurestore_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
featurestore_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"]
# Feature store ml client
fs_client = MLClient(
AzureMLOnBehalfOfCredential(),
featurestore_subscription_id,
featurestore_resource_group_name,
featurestore_name,
)
기능 저장소 코어 SDK 클라이언트를 초기화
앞서 언급했듯이 이 자습서에서는 Python 기능 저장소 Core SDK(azureml-featurestore
)를 사용합니다. 이 초기화된 SDK 클라이언트는 기능 저장소, 기능 집합 및 기능 저장소 엔터티에 대한 CRUD(만들기, 읽기, 업데이트 및 삭제) 작업을 다룹니다.
from azureml.featurestore import FeatureStoreClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
featurestore = FeatureStoreClient(
credential=AzureMLOnBehalfOfCredential(),
subscription_id=featurestore_subscription_id,
resource_group_name=featurestore_resource_group_name,
name=featurestore_name,
)
사용자 지정 원본 정의
사용자 지정 원본 정의가 있는 모든 데이터 스토리지에서 고유한 원본 로딩 논리를 정의할 수 있습니다. 이 함수를 사용하려면 원본 프로세서 UDF(사용자 정의 함수) 클래스(이 자습서의 CustomSourceTransformer
)를 구현합니다. 이 클래스는 __init__(self, **kwargs)
함수와 process(self, start_time, end_time, **kwargs)
함수를 정의해야 합니다. kwargs
사전은 기능 집합 사양 정의의 일부로 제공됩니다. 그런 다음 이 정의가 UDF로 전달됩니다. start_time
및 end_time
매개 변수가 계산되어 UDF 함수에 전달됩니다.
다음은 원본 프로세서 UDF 클래스의 샘플 코드입니다.
from datetime import datetime
class CustomSourceTransformer:
def __init__(self, **kwargs):
self.path = kwargs.get("source_path")
self.timestamp_column_name = kwargs.get("timestamp_column_name")
if not self.path:
raise Exception("`source_path` is not provided")
if not self.timestamp_column_name:
raise Exception("`timestamp_column_name` is not provided")
def process(
self, start_time: datetime, end_time: datetime, **kwargs
) -> "pyspark.sql.DataFrame":
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, to_timestamp
spark = SparkSession.builder.getOrCreate()
df = spark.read.json(self.path)
if start_time:
df = df.filter(col(self.timestamp_column_name) >= to_timestamp(lit(start_time)))
if end_time:
df = df.filter(col(self.timestamp_column_name) < to_timestamp(lit(end_time)))
return df
사용자 지정 원본으로 기능 집합 사양을 만들고 로컬에서 실험해 보세요.
이제 사용자 지정 원본 정의로 기능 집합 사양을 만들고 이를 개발 환경에서 사용하여 기능 집합을 실험해 보세요. 서버리스 Spark Compute에 연결된 자습서 Notebook은 개발 환경 역할을 합니다.
from azureml.featurestore import create_feature_set_spec
from azureml.featurestore.feature_source import CustomFeatureSource
from azureml.featurestore.contracts import (
SourceProcessCode,
TransformationCode,
Column,
ColumnType,
DateTimeOffset,
TimestampColumn,
)
transactions_source_process_code_path = (
root_dir
+ "/featurestore/featuresets/transactions_custom_source/source_process_code"
)
transactions_feature_transform_code_path = (
root_dir
+ "/featurestore/featuresets/transactions_custom_source/feature_process_code"
)
udf_featureset_spec = create_feature_set_spec(
source=CustomFeatureSource(
kwargs={
"source_path": "wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source-json/*.json",
"timestamp_column_name": "timestamp",
},
timestamp_column=TimestampColumn(name="timestamp"),
source_delay=DateTimeOffset(days=0, hours=0, minutes=20),
source_process_code=SourceProcessCode(
path=transactions_source_process_code_path,
process_class="source_process.CustomSourceTransformer",
),
),
feature_transformation=TransformationCode(
path=transactions_feature_transform_code_path,
transformer_class="transaction_transform.TransactionFeatureTransformer",
),
index_columns=[Column(name="accountID", type=ColumnType.string)],
source_lookback=DateTimeOffset(days=7, hours=0, minutes=0),
temporal_join_lookback=DateTimeOffset(days=1, hours=0, minutes=0),
infer_schema=True,
)
udf_featureset_spec
다음으로 기능 창을 정의하고 이 기능 창에 기능 값을 표시합니다.
from datetime import datetime
st = datetime(2023, 1, 1)
et = datetime(2023, 6, 1)
display(
udf_featureset_spec.to_spark_dataframe(
feature_window_start_date_time=st, feature_window_end_date_time=et
)
)
기능 집합 사양으로 내보내기
기능 집합 사양을 기능 저장소에 등록하려면 먼저 해당 사양을 특정 형식으로 저장합니다. 생성된 transactions_custom_source
기능 집합 사양을 검토합니다. 파일 트리에서 이 파일을 열어 사양 featurestore/featuresets/transactions_custom_source/spec/FeaturesetSpec.yaml
을 확인합니다.
사양에는 다음 요소가 있습니다.
features
: 기능 및 해당 데이터 형식의 목록입니다.index_columns
: 기능 집합의 값에 액세스하는 데 필요한 조인 키입니다.
사양에 대한 자세한 내용은 관리되는 네트워크 격리 및 CLI(v2) 기능 집합 YAML 스키마 리소스의 최상위 엔터티 이해를 참조하세요.
기능 집합 사양 지속성은 또 다른 이점을 제공합니다. 즉, 기능 집합 사양을 원본으로 제어할 수 있습니다.
feature_spec_folder = (
root_dir + "/featurestore/featuresets/transactions_custom_source/spec"
)
udf_featureset_spec.dump(feature_spec_folder)
기능 저장소에 트랜잭션 기능 집합을 등록합니다.
이 코드를 사용하여 사용자 지정 원본에서 로드된 기능 집합 자산을 기능 저장소에 등록합니다. 그런 다음 해당 자산을 재사용하고 쉽게 공유할 수 있습니다. 기능 집합 자산을 등록하면 버전 관리 및 구체화를 포함한 관리 기능이 제공됩니다.
from azure.ai.ml.entities import FeatureSet, FeatureSetSpecification
transaction_fset_config = FeatureSet(
name="transactions_custom_source",
version="1",
description="transactions feature set loaded from custom source",
entities=["azureml:account:1"],
stage="Development",
specification=FeatureSetSpecification(path=feature_spec_folder),
tags={"data_type": "nonPII"},
)
poller = fs_client.feature_sets.begin_create_or_update(transaction_fset_config)
print(poller.result())
등록된 기능 집합을 가져오고 관련 정보를 인쇄합니다.
# Look up the feature set by providing name and version
transactions_fset_config = featurestore.feature_sets.get(
name="transactions_custom_source", version="1"
)
# Print feature set information
print(transactions_fset_config)
등록된 기능 집합에서 기능 생성 테스트
등록된 함수 집합에서 함수 생성을 테스트하고 함수를 표시하려면 함수 집합의 to_spark_dataframe()
함수를 사용합니다.
print-txn-fset-sample-values
df = transactions_fset_config.to_spark_dataframe()
display(df)
등록된 기능 집합을 Spark 데이터 프레임으로 성공적으로 가져온 다음 표시할 수 있어야 합니다. 이제 관찰 데이터와의 특정 시점 조인 및 기계 학습 파이프라인의 후속 단계에 이러한 기능을 사용할 수 있습니다.
정리
자습서용 리소스 그룹을 만든 경우 해당 리소스 그룹을 삭제하면 이 자습서와 연결된 모든 리소스가 삭제됩니다. 그렇지 않으면 리소스를 개별적으로 삭제할 수 있습니다.
- 기능 저장소를 삭제하려면 Azure Portal에서 리소스 그룹을 열고 기능 저장소를 선택한 후 삭제합니다.
- 기능 저장소 작업 영역에 할당된 UAI(사용자가 할당한 관리 ID)는 기능 저장소를 삭제할 때 삭제되지 않습니다. UAI를 삭제하려면 이 지침을 따릅니다.
- 스토리지 계정형 오프라인 저장소를 삭제하려면 Azure Portal에서 리소스 그룹을 열고, 만든 스토리지를 선택하고 삭제합니다.
- Azure Cache for Redis 인스턴스를 삭제하려면 Azure Portal에서 리소스 그룹을 열고, 만든 인스턴스를 선택한 후 삭제합니다.