部署和查询特征服务终结点
本文逐步介绍如何部署和查询特征服务终结点。 本文使用 Databricks SDK。 某些步骤也可以使用 REST API 或 Databricks UI 来完成,其中提供了这些方法的参考文档。
在此示例中,有一个包含位置(纬度和经度)的城市表,以及一个考虑用户与这些城市的当前距离的推荐器应用。 由于用户的位置不断变化,因此在推理时必须计算用户与每个城市的距离。 本教程将演示如何使用 Databricks 联机表和 Databricks 特征服务以较低的延迟执行这些计算。 有关完整示例代码,请参阅示例笔记本。
步骤 1. 创建源表
源表包含预先计算的特征值,可以是 Unity Catalog 中具有主键的任何 Delta 表。 在此示例中,该表包含城市及其纬度和经度的列表。 主键是 destination_id
。 示例数据如下所示。
name | destination_id (pk) | latitude | longitude |
---|---|---|---|
Nashville, Tennessee | 0 | 36.162663 | -86.7816 |
Honolulu, Hawaii | 1 | 21.309885 | -157.85814 |
内华达州拉斯维加斯 | 2 | 36.171562 | -115.1391 |
纽约州,纽约市 | 3 | 40.712776 | -74.005974 |
步骤 2. 创建联机表
联机表是 Delta 表的只读副本,已针对联机访问进行优化。 有关详细信息,请参阅为实时特征服务使用联机表。
若要创建联机表,可以使用 UI(参阅使用 UI 创建联机表)、REST API 或 Databricks SDK,如以下示例所示:
from pprint import pprint
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import *
import mlflow
workspace = WorkspaceClient()
# Create an online table
feature_table_name = f"main.on_demand_demo.location_features"
online_table_name=f"main.on_demand_demo.location_features_online"
spec = OnlineTableSpec(
primary_key_columns=["destination_id"],
source_table_full_name = feature_table_name,
run_triggered=OnlineTableSpecTriggeredSchedulingPolicy.from_dict({'triggered': 'true'}),
perform_full_copy=True)
# ignore "already exists" error
try:
online_table_pipeline = workspace.online_tables.create(name=online_table_name, spec=spec)
except Exception as e:
if "already exists" in str(e):
pass
else:
raise e
pprint(workspace.online_tables.get(online_table_name))
步骤 3. 在 Unity Catalog 中创建函数
在此示例中,该函数将计算目标(其位置不会改变)和用户(其位置经常变化并且在推理时才知道)之间的距离。
# Define the function. This function calculates the distance between two locations.
function_name = f"main.on_demand_demo.distance"
spark.sql(f"""
CREATE OR REPLACE FUNCTION {function_name}(latitude DOUBLE, longitude DOUBLE, user_latitude DOUBLE, user_longitude DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON AS
$$
import math
lat1 = math.radians(latitude)
lon1 = math.radians(longitude)
lat2 = math.radians(user_latitude)
lon2 = math.radians(user_longitude)
# Earth's radius in kilometers
radius = 6371
# Haversine formula
dlat = lat2 - lat1
dlon = lon2 - lon1
a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
distance = radius * c
return distance
$$""")
步骤 4. 在 Unity Catalog 中创建特征规范
特征规范指定终结点提供的特征及其查找键。 它还指定应用于检索到的特征及其绑定的任何所需函数。 有关详细信息,请参阅创建 FeatureSpec。
from databricks.feature_engineering import FeatureLookup, FeatureFunction, FeatureEngineeringClient
fe = FeatureEngineeringClient()
features=[
FeatureLookup(
table_name=feature_table_name,
lookup_key="destination_id"
),
FeatureFunction(
udf_name=function_name,
output_name="distance",
input_bindings={
"latitude": "latitude",
"longitude": "longitude",
"user_latitude": "user_latitude",
"user_longitude": "user_longitude"
},
),
]
feature_spec_name = f"main.on_demand_demo.travel_spec"
# The following code ignores errors raised if a feature_spec with the specified name already exists.
try:
fe.create_feature_spec(name=feature_spec_name, features=features, exclude_columns=None)
except Exception as e:
if "already exists" in str(e):
pass
else:
raise e
步骤 5。 创建特征服务终结点
若要创建特征服务终结点,可以使用 UI(创建终结点)、REST API 或 Databricks SDK,如下所示。
特征服务终结点采用步骤 4 中创建的 feature_spec
作为参数。
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedEntityInput
# Create endpoint
endpoint_name = "fse-location"
try:
status = workspace.serving_endpoints.create_and_wait(
name=endpoint_name,
config = EndpointCoreConfigInput(
served_entities=[
ServedEntityInput(
entity_name=feature_spec_name,
scale_to_zero_enabled=True,
workload_size="Small"
)
]
)
)
print(status)
# Get the status of the endpoint
status = workspace.serving_endpoints.get(name=endpoint_name)
print(status)
步骤 6。 查询特征服务终结点
查询终结点时,需要提供主键以及(可选)函数使用的任何上下文数据。 在此示例中,该函数采用用户的当前位置(纬度和经度)作为输入。 由于用户的位置不断变化,因此必须在推理时将其作为上下文特征提供给函数。
也可以使用 UI(参阅使用 UI 查询终结点)或 REST API 来查询终结点。
为简单起见,本示例仅计算前往两个城市的距离。 更真实的方案可能会计算用户与特征表中每个位置的距离,以确定要推荐哪些城市。
import mlflow.deployments
client = mlflow.deployments.get_deploy_client("databricks")
response = client.predict(
endpoint=endpoint_name,
inputs={
"dataframe_records": [
{"destination_id": 1, "user_latitude": 37, "user_longitude": -122},
{"destination_id": 2, "user_latitude": 37, "user_longitude": -122},
]
},
)
pprint(response)
示例笔记本
请参阅此笔记本以获取步骤的完整说明:
使用联机表的功能服务示例笔记本
其他信息
有关使用特征工程 Python API 的详细信息,请参阅参考文档。